Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ sigma plugin install logpoint
pip3 install pysigma-backend-logpoint
```

You can verify that the backend is registered with Sigma CLI via:

```bash
sigma plugin list --plugin-type backend
```

#### Converting Sigma Rules
Once the packages are successfully installed, you can convert Sigma rules into Logpoint queries using the command below. For example, to convert the
[Suspicious Process Masquerading As SvcHost.EXE](https://github.com/SigmaHQ/sigma/blob/598d29f811c1859ba18e05b8c419cc94410c9a55/rules/windows/process_creation/proc_creation_win_svchost_masqueraded_execution.yml)
Expand All @@ -61,7 +67,7 @@ sigma convert -t logpoint -p logpoint_windows rules/windows/process_creation/pro
╭─ubuntu@ubuntu
╰─$ sigma convert -t logpoint -p logpoint_windows rules/windows/process_creation/proc_creation_win_svchost_masqueraded_execution.yml
Parsing Sigma rules [####################################] 100%
label="Create" label="Process" "process"="*\svchost.exe" - ("process" IN ["C:\Windows\System32\svchost.exe", "C:\Windows\SysWOW64\svchost.exe"] OR file="svchost.exe")
label="Create" label="Process" "process"="*\svchost.exe" -("process" IN ["C:\Windows\System32\svchost.exe", "C:\Windows\SysWOW64\svchost.exe"] OR file="svchost.exe")
```

## Limitations and Constraints
Expand Down
183 changes: 183 additions & 0 deletions examples/logpoint_conversion_samples.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
from textwrap import dedent

from sigma.backends.logpoint import Logpoint
from sigma.collection import SigmaCollection


SAMPLE_RULES = [
(
"Single selection",
"""
title: Simple equality
status: test
logsource:
product: test_product
category: test_category
detection:
sel:
fieldA: valueA
condition: sel
""",
),
(
"OR combination",
"""
title: OR combination
status: test
logsource:
product: test_product
category: test_category
detection:
sel1:
fieldA: valueA
sel2:
fieldB: valueB
condition: 1 of sel*
""",
),
(
"AND with lists",
"""
title: List expansion
status: test
logsource:
product: test_product
category: test_category
detection:
sel:
fieldA:
- valueA1
- valueA2
fieldB:
- valueB1
- valueB2
condition: sel
""",
),
(
"Field name with whitespace",
"""
title: Whitespace field
status: test
logsource:
product: test_product
category: test_category
detection:
sel:
field name: value
condition: sel
""",
),
(
"Compact NOT",
"""
title: Compact NOT
status: test
logsource:
product: test_product
category: test_category
detection:
sel1:
fieldA: valueA
sel2:
fieldB: valueB
condition: not (sel1 or sel2)
""",
),
(
"Null filter",
"""
title: Null filter
status: test
logsource:
product: test_product
category: test_category
detection:
filter:
fieldA: null
condition: not filter
""",
),
(
"Endswith and null filters",
"""
title: Endswith null filters
status: test
logsource:
product: test_product
category: test_category
detection:
selection:
FieldA|endswith: 'valueA'
filter_1:
FieldB: null
filter_2:
FieldB: ''
condition: selection and not filter_1 and not filter_2
""",
),
(
"CIDR",
"""
title: CIDR filter
status: test
logsource:
product: test_product
category: test_category
detection:
sel:
field|cidr:
- 192.168.0.0/16
- 10.0.0.0/8
fieldB: foo
fieldC: bar
condition: sel
""",
),
(
"Regex",
"""
title: Regex test
status: test
logsource:
product: test_product
category: test_category
detection:
sel:
fieldA|re: foo.*bar
fieldB: foo
condition: sel
""",
),
(
"Contains all",
"""
title: Contains all
status: test
logsource:
product: test_product
category: test_category
detection:
sel:
fieldA|contains|all:
- valueA
- valueB
condition: sel
""",
),
]


def convert_samples() -> None:
backend = Logpoint()
for index, (title, yaml_rule) in enumerate(SAMPLE_RULES, start=1):
collection = SigmaCollection.from_yaml(dedent(yaml_rule))
conversions = backend.convert(collection)
print(f"{index}. {title}:")
for conversion in conversions:
print(f" {conversion}")
print()


if __name__ == "__main__":
convert_samples()
8 changes: 8 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ pytest-cov = "^5.0.0"
coverage = "^7.5.3"
defusedxml = "^0.7.1"

[tool.poetry.plugins."sigma.backends"]
logpoint = "sigma.backends.logpoint:backends"

[tool.poetry.plugins."sigma.pipelines"]
logpoint_windows = "sigma.pipelines.logpoint:pipelines"
logpoint_o365 = "sigma.pipelines.logpoint:pipelines"
logpoint_azure = "sigma.pipelines.logpoint:pipelines"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
21 changes: 21 additions & 0 deletions sigma/backends/logpoint/logpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
)
from sigma.conversion.base import TextQueryBackend
from sigma.conversion.deferred import (
DeferredQueryExpression,
DeferredTextQueryExpression,
)
from sigma.conversion.state import ConversionState
Expand Down Expand Up @@ -271,3 +272,23 @@ def convert_condition(self, cond: ConditionType, state: ConversionState) -> Any:
elif isinstance(cond, ConditionFieldEqualsValueExpression):
self.modify_condition_from_json_value_construction(cond)
return super().convert_condition(cond, state)

def convert_condition_not(
self, cond: ConditionNOT, state: ConversionState
) -> Union[str, DeferredQueryExpression]:
"""Conversion of NOT conditions."""
arg = cond.args[0]
try:
if (
arg.__class__ in self.precedence
): # group if AND or OR condition is negated
grouped = self.convert_condition_group(arg, state)
return f"{self.not_token}{grouped.lstrip()}"
expr = self.convert_condition(arg, state)
if isinstance(
expr, DeferredQueryExpression
): # negate deferred expression and pass it to parent
return expr.negate()
return f"{self.not_token}{str(expr).lstrip()}" # convert negated expression to string
except TypeError: # pragma: no cover
raise NotImplementedError("Operator 'not' not supported by the backend")
57 changes: 49 additions & 8 deletions tests/test_backend_logpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,7 @@ def test_logpoint_not_filter_null_and(logpoint_backend: Logpoint):
"""
)

assert logpoint_backend.convert(rule) == [
'FieldA="*valueA" - FieldB!=* - FieldB=""'
]
assert logpoint_backend.convert(rule) == ['FieldA="*valueA" -FieldB!=* -FieldB=""']


def test_logpoint_filter_null_and(logpoint_backend: Logpoint):
Expand All @@ -229,7 +227,7 @@ def test_logpoint_filter_null_and(logpoint_backend: Logpoint):
"""
)

assert logpoint_backend.convert(rule) == ['FieldA="*valueA" FieldB!=* - FieldB=""']
assert logpoint_backend.convert(rule) == ['FieldA="*valueA" FieldB!=* -FieldB=""']


def test_logpoint_not_filter_null_or(logpoint_backend: Logpoint):
Expand All @@ -252,7 +250,7 @@ def test_logpoint_not_filter_null_or(logpoint_backend: Logpoint):
)

assert logpoint_backend.convert(rule) == [
'FieldA="*valueA" - FieldB!=* OR - FieldB=""'
'FieldA="*valueA" -FieldB!=* OR -FieldB=""'
]


Expand All @@ -276,7 +274,7 @@ def test_logpoint_filter_null_or(logpoint_backend: Logpoint):
)

assert logpoint_backend.convert(rule) == [
'FieldA="*valueA" FieldB!=* OR - FieldB=""'
'FieldA="*valueA" FieldB!=* OR -FieldB=""'
]


Expand All @@ -300,7 +298,7 @@ def test_logpoint_filter_not_or_null(logpoint_backend: Logpoint):
)

assert logpoint_backend.convert(rule) == [
'FieldA="*valueA" - (FieldB!=* OR FieldB="")'
'FieldA="*valueA" -(FieldB!=* OR FieldB="")'
]


Expand All @@ -319,7 +317,50 @@ def test_logpoint_filter_not(logpoint_backend: Logpoint):
"""
)

assert logpoint_backend.convert(rule) == ["- Field!=*"]
assert logpoint_backend.convert(rule) == ["-Field!=*"]


def test_logpoint_compact_not_grouping(logpoint_backend: Logpoint):
rule = SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel1:
fieldA: valueA
sel2:
fieldB: valueB
condition: not (sel1 or sel2)
"""
)

assert logpoint_backend.convert(rule) == ['-(fieldA="valueA" OR fieldB="valueB")']


def test_logpoint_not_spacing(logpoint_backend: Logpoint):
rule = SigmaCollection.from_yaml(
"""
title: Test NOT spacing
status: test
logsource:
category: test_category
product: test_product
detection:
sel1:
fieldA: valueA
sel2:
fieldB: valueB
condition: not (sel1 or sel2)
"""
)

converted = logpoint_backend.convert(rule)[0]

assert "- (" not in converted
assert converted.startswith("-(")


def test_logpoint_angle_brackets(logpoint_backend: Logpoint):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_pipelines_windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def test_logpoint_windows_variable_mapping_registry_event():
)
)
== [
'event_id IN [12, 13, 14] norm_id="WindowsSysmon" target_object="*System\CurrentControlSet\Services\VSS*" "process"="*esentutl.exe" - target_object="*System\CurrentControlSet\Services\VSS\Start*"'
'event_id IN [12, 13, 14] norm_id="WindowsSysmon" target_object="*System\CurrentControlSet\Services\VSS*" "process"="*esentutl.exe" -target_object="*System\CurrentControlSet\Services\VSS\Start*"'
]
)

Expand Down