-
-
Notifications
You must be signed in to change notification settings - Fork 109
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Parses Sigma rule without: * further parsing of conditions * modifier classes * rule collections
- Loading branch information
0 parents
commit 919d3bd
Showing
9 changed files
with
1,005 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
.coverage | ||
.vscode/ | ||
**/__pycache__ |
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
[tool.poetry] | ||
name = "sigmatools" | ||
version = "1.0.0" | ||
description = "Sigma rule processing and conversion tools" | ||
authors = ["Thomas Patzke <[email protected]>"] | ||
|
||
[tool.poetry.dependencies] | ||
python = "^3.8" | ||
pyyaml = "^5.3.1" | ||
|
||
[tool.poetry.dev-dependencies] | ||
pytest = "^5.2" | ||
pytest-cov = "^2.10.0" | ||
pytest-mypy = "^0.6.2" | ||
pylint = "^2.6.0" | ||
|
||
[build-system] | ||
requires = ["poetry>=1.1.2"] | ||
build-backend = "poetry.masonry.api" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
__version__ = '1.0.0' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
class SigmaModifierBase(object): | ||
identifier : str = "base" | ||
active : bool = False | ||
|
||
def apply(self): | ||
raise SigmaNotImplementedError("Invalid attempt to apply base value modifier.") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,245 @@ | ||
from dataclasses import dataclass | ||
from typing import Optional, Union, Sequence, List, Mapping, TypeVar, Type | ||
from uuid import UUID | ||
from enum import Enum | ||
from datetime import date | ||
import yaml | ||
|
||
SigmaStatus = Enum("SigmaStatus", "stable experimental test") | ||
SigmaLevel = Enum("SigmaLevel", "low medium high critical") | ||
|
||
@dataclass | ||
class SigmaRuleTag: | ||
namespace : str | ||
name : str | ||
|
||
@classmethod | ||
def from_str(cls, tag : str) -> "SigmaRuleTag": | ||
"""Build SigmaRuleTag class from plain text tag string.""" | ||
ns, n = tag.split(".", maxsplit=1) | ||
return cls(ns, n) | ||
|
||
@dataclass | ||
class SigmaLogSource: | ||
category : Optional[str] | ||
product : Optional[str] | ||
service : Optional[str] | ||
|
||
def __post_init__(self): | ||
"""Ensures that log source is not empty.""" | ||
if self.category == None and self.product == None and self.service == None: | ||
raise SigmaError("Sigma log source can't be empty") | ||
|
||
@classmethod | ||
def from_dict(cls, logsource : dict) -> "SigmaLogSource": | ||
"""Returns SigmaLogSource object from dict with fields.""" | ||
return cls( | ||
logsource.get("category"), | ||
logsource.get("product"), | ||
logsource.get("service"), | ||
) | ||
|
||
@dataclass | ||
class SigmaDetectionItem: | ||
""" | ||
Single Sigma detection definition | ||
A detection consists of: | ||
* an optional field name | ||
* a list of value modifiers that can also be empty | ||
* the mandatory value or a list of values (internally it's always a list of values) | ||
""" | ||
field : Optional[str] # if None, this is a keyword argument not bound to a field | ||
modifiers : List[str] | ||
value : List[Union[int, str]] | ||
|
||
@classmethod | ||
def from_mapping( | ||
cls, | ||
key : Optional[str], | ||
val : Union[ | ||
List[Union[int, str]], | ||
Union[int, str], | ||
] | ||
) -> "SigmaDetectionItem": | ||
""" | ||
Constructs SigmaDetectionItem object from a mapping between field name containing | ||
modifiers and a value. This also supports keys containing only value modifiers | ||
which results in a keyword detection. | ||
The value accepts plain values as well as lists of values and resolves them into | ||
the value list always contained in a SigmaDetectionItem instance. | ||
""" | ||
if key is None: # no key at all means pure keyword detection without value modifiers | ||
field = None | ||
modifiers = list() | ||
else: # key-value detection | ||
field, *modifiers = key.split("|") | ||
if field == "": | ||
field = None | ||
|
||
if isinstance(val, (int, str)): # value is plain, convert into single element list | ||
val = [val] | ||
|
||
return cls(field, modifiers, val) | ||
|
||
@classmethod | ||
def from_value( | ||
cls, | ||
val : Union[ | ||
List[Union[int, str]], | ||
Union[int, str], | ||
] | ||
) -> "SigmaDetectionItem": | ||
"""Convenience method for from_mapping(None, value).""" | ||
return cls.from_mapping(None, val) | ||
|
||
@dataclass | ||
class SigmaDetection: | ||
""" | ||
A detection is a set of atomic event defitionions represented by SigmaDetectionItem instances. SigmaDetectionItems | ||
of a SigmaDetection are OR-linked. | ||
A detection can be defined by: | ||
1. a mapping between field/value pairs that all should appear in matched events. | ||
2. a list of plain values or mappings defined and matched as in 1 where at least one of the items should appear in matched events. | ||
""" | ||
detection_items : List[Union[SigmaDetectionItem, "SigmaDetection"]] | ||
|
||
@classmethod | ||
def from_definition(cls, definition : Union[Mapping, Sequence]) -> "SigmaDetection": | ||
"""Instantiate an appropriate SigmaDetection object from a parsed Sigma detection definition.""" | ||
if isinstance(definition, Mapping): # key-value-definition (case 1) | ||
return cls( | ||
detection_items=[ | ||
SigmaDetectionItem.from_mapping(key, val) | ||
for key, val in definition.items() | ||
]) | ||
elif isinstance(definition, Sequence): # list of items (case 2) | ||
return cls( | ||
detection_items=[ | ||
SigmaDetectionItem.from_value(item) if isinstance(item, (str, int)) # SigmaDetectionItem in case of a plain value or a list of plain values | ||
else SigmaDetection.from_definition(item) # nested SigmaDetection in other cases | ||
for item in definition | ||
] | ||
) | ||
|
||
@dataclass | ||
class SigmaDetections: | ||
"""Sigma detection section including named detections and condition.""" | ||
detections : Mapping[str, List[SigmaDetection]] | ||
condition : List[str] | ||
|
||
def __post_init__(self): | ||
"""Detections sanity checks""" | ||
if self.detections == dict(): | ||
raise SigmaError("No detections defined in Sigma rule") | ||
|
||
@classmethod | ||
def from_dict(cls, detections : dict) -> "SigmaDetections": | ||
try: | ||
if isinstance(detections["condition"], list): | ||
condition = detections["condition"] | ||
else: | ||
condition = [ detections["condition"] ] | ||
except KeyError: | ||
raise SigmaError("Sigma rule must contain at least one condition") | ||
del detections["condition"] | ||
|
||
return cls( | ||
detections={ | ||
name: SigmaDetection.from_definition(definition) | ||
for name, definition in detections.items() | ||
}, | ||
condition=condition, | ||
) | ||
|
||
@dataclass | ||
class SigmaRule: | ||
title : str | ||
id : Optional[UUID] | ||
status : Optional[SigmaStatus] | ||
description : Optional[str] | ||
references : List[str] | ||
tags : Optional[List[SigmaRuleTag]] | ||
author : Optional[str] | ||
date : date | ||
logsource : SigmaLogSource | ||
detection : SigmaDetections | ||
fields : Optional[List[str]] | ||
falsepositives : Optional[List[str]] | ||
level : Optional[SigmaLevel] | ||
|
||
@classmethod | ||
def from_dict(cls, rule : dict) -> "SigmaRule": | ||
"""Convert Sigma rule parsed in dict structure into SigmaRule object.""" | ||
# Rule identifier may be empty or must be valid UUID | ||
rule_id = rule.get("id") | ||
if rule_id is not None: | ||
try: | ||
rule_id = UUID(rule_id) | ||
except ValueError: | ||
raise SigmaError("Sigma rule identifier must be an UUID") | ||
|
||
# Rule level validation | ||
level = rule.get("level") | ||
try: | ||
level = SigmaLevel[level] | ||
except KeyError: | ||
raise SigmaError(f"'{ level }' is no valid Sigma rule level") | ||
|
||
# Rule status validation | ||
status = rule.get("status") | ||
try: | ||
status = SigmaStatus[status] | ||
except KeyError: | ||
raise SigmaError(f"'{ status }' is no valid Sigma rule status") | ||
|
||
# parse rule date if existing | ||
rule_date = rule.get("date") | ||
if rule_date is not None: | ||
try: | ||
rule_date = date(*(int(i) for i in rule_date.split("/"))) | ||
except ValueError: | ||
try: | ||
rule_date = date(*(int(i) for i in rule_date.split("-"))) | ||
except ValueError: | ||
raise SigmaError(f"Rule date '{ rule_date }' is invalid, must be yyyy/mm/dd or yyyy-mm-dd") | ||
|
||
# parse log source | ||
try: | ||
logsource = SigmaLogSource.from_dict(rule["logsource"]) | ||
except KeyError: | ||
raise SigmaError("Sigma rule must have a log source") | ||
|
||
# parse detections | ||
try: | ||
detections = SigmaDetections.from_dict(rule["detection"]) | ||
except KeyError: | ||
raise SigmaError("Sigma rule must have a detection definitions") | ||
|
||
return cls( | ||
title = rule["title"], | ||
id = rule_id, | ||
level = level, | ||
status = status, | ||
description = rule.get("description"), | ||
references = rule.get("references"), | ||
tags = [ SigmaRuleTag.from_str(tag) for tag in rule.get("tags", list()) ], | ||
author = rule.get("author"), | ||
date = rule_date, | ||
logsource = logsource, | ||
detection = detections, | ||
fields = rule.get("fields"), | ||
falsepositives = rule.get("falsepositives"), | ||
) | ||
|
||
@classmethod | ||
def from_yaml(cls, rule : str) -> "SigmaRule": | ||
"""Convert YAML input string with single document into SigmaRule object.""" | ||
parsed_rule = yaml.safe_load(rule) | ||
return cls.from_dict(parsed_rule) | ||
|
||
class SigmaError(ValueError): | ||
pass |
Empty file.
Oops, something went wrong.