Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small error handling tweaks #37

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@

def parse(family: str, config: Dict[str, Any]) -> IocCollection:
iocs = IocCollection()

# Handlers for special-cased families
if family in modules.modules:
iocs = modules.modules[family](config)

# Generic parser for the rest of the fields
modules.parse(config, iocs)

return iocs
19 changes: 13 additions & 6 deletions src/model.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import re
from base64 import b64encode
from enum import Enum
Expand All @@ -10,6 +11,8 @@

from .errors import IocExtractError

log = logging.getLogger(__name__)

PUBKEY_PEM_TEMPLATE = (
"-----BEGIN PUBLIC KEY-----\n{}\n-----END PUBLIC KEY-----"
)
Expand Down Expand Up @@ -230,21 +233,21 @@ def try_add_rsa_from_pem(self, pem: str) -> None:
if pem:
self.add_rsa_key(RsaKey.parse_pem(pem))
except IocExtractError:
pass
log.warn("Failed to parse a RSA key from PEM")
msm-cert marked this conversation as resolved.
Show resolved Hide resolved

def try_add_rsa_from_asn1_bytes(self, blob: bytes) -> None:
pem = PUBKEY_PEM_TEMPLATE.format(b64encode(blob).decode())

try:
self.add_rsa_key(RsaKey.parse_pem(pem))
except IocExtractError:
pass
log.warn("Failed to parse a RSA key from ASN1")
msm-cert marked this conversation as resolved.
Show resolved Hide resolved

def try_add_rsa_from_base64(self, pem: str) -> None:
try:
self.add_rsa_key(RsaKey.parse_base64(pem))
except IocExtractError:
pass
log.warn("Failed to parse a RSA key from base64")
msm-cert marked this conversation as resolved.
Show resolved Hide resolved

def add_network_location(self, netloc: NetworkLocation) -> None:
self.network_locations.append(netloc)
Expand All @@ -253,13 +256,17 @@ def add_host_port(
self, host: str, port: Union[str, int], schema: str = "unknown"
) -> None:
if isinstance(port, str):
port_val = int(port)
try:
port_val = int(port)
except ValueError:
log.warn("Failed to add URL from host_port")
msm-cert marked this conversation as resolved.
Show resolved Hide resolved
return
else:
port_val = port
try:
self.try_add_url(f"{schema}://{host}:{port_val}")
except IocExtractError:
pass
log.warn("Failed to add URL from host_port")
msm-cert marked this conversation as resolved.
Show resolved Hide resolved

def try_add_url(
self, url: str, location_type: LocationType = LocationType.CNC
Expand All @@ -271,7 +278,7 @@ def try_add_url(
NetworkLocation(url, location_type=location_type)
)
except IocExtractError:
pass
log.warn("Failed to add URL directly")
msm-cert marked this conversation as resolved.
Show resolved Hide resolved

def add_password(self, password: str) -> None:
self.passwords.append(password)
Expand Down
36 changes: 27 additions & 9 deletions src/modules.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,31 @@
import logging
import string
from base64 import b64decode
from typing import Any, Dict, List

from .errors import ModuleAlreadyRegisteredError
from .errors import IocExtractError, ModuleAlreadyRegisteredError
from .model import EcdsaCurve, IocCollection, LocationType, RsaKey

modules: Dict[str, Any] = {}
log = logging.getLogger(__name__)


# Utils
class CantFindAHostForTheDomain(IocExtractError):
msm-cert marked this conversation as resolved.
Show resolved Hide resolved
"""Can't find a host for the domain when adding url."""

pass


class DomainHasToBeAStringOrADict(IocExtractError):
"""Adding URL from something other than string or a dict."""

pass


class UnknownRsaKeyType(IocExtractError):
"""Can't guess the RSA key format."""

pass


def module(name):
Expand Down Expand Up @@ -50,11 +67,9 @@ def add_url(iocs: IocCollection, config: Dict[str, Any], key: str) -> None:
iocs.try_add_url(domain[hostkey])
break
else:
raise NotImplementedError("Can't find a host for the domain")
raise CantFindAHostForTheDomain()
else:
raise NotImplementedError(
"The domain has to be either a string or a list"
)
raise DomainHasToBeAStringOrADict()


def add_rsa_key(iocs: IocCollection, config: Dict, key: str) -> None:
Expand Down Expand Up @@ -87,7 +102,7 @@ def add_rsa_key(iocs: IocCollection, config: Dict, key: str) -> None:
iocs.try_add_rsa_from_asn1_bytes(enc_bytes.rstrip(b"\x00"))
continue

raise NotImplementedError("Unknown RSA key type")
raise UnknownRsaKeyType()


def add_key(iocs: IocCollection, config: Dict, key: str, keytype: str) -> None:
Expand All @@ -106,7 +121,10 @@ def add_mutex(iocs: IocCollection, config: Dict, key: str) -> None:

def parse(config: Dict[str, Any], iocs: IocCollection) -> None:
for name in ["publickey", "rsapub", "rsakey", "pubkey", "privkey"]:
add_rsa_key(iocs, config, name)
try:
add_rsa_key(iocs, config, name)
except UnknownRsaKeyType:
log.warn("Unknown RSA key type")
msm-cert marked this conversation as resolved.
Show resolved Hide resolved

for name in [
"urls",
Expand Down Expand Up @@ -372,7 +390,7 @@ def parse_lockbit(config: Dict[str, Any]) -> IocCollection:
iocs.add_rsa_key(RsaKey(n=n, e=e))
del config["rsa_pub"]
except Exception:
pass
log.warn("Failed to parse a lockbit key")
msm-cert marked this conversation as resolved.
Show resolved Hide resolved

return iocs

Expand Down
12 changes: 11 additions & 1 deletion src/run.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import argparse
import logging

from mwdblib import MWDB # type: ignore

Expand All @@ -14,8 +15,16 @@ def main():
parser.add_argument(
"config_id", help="Config to parse", default=None, nargs="?"
)
parser.add_argument(
"-v", "--verbose", help="Print debug logs", action="store_true"
)
args = parser.parse_args()

if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)

mwdb = MWDB()
mwdb.login(args.mwdb_user, args.mwdb_pass)

Expand All @@ -28,9 +37,10 @@ def main():
for cfg in mwdb.recent_configs():
if cfg.type != "static":
continue
print(cfg.id)
print(cfg.family, cfg.id)
iocs = parse(cfg.family, cfg.cfg)
print(iocs.prettyprint())
print()
continue


Expand Down
3 changes: 2 additions & 1 deletion tests/download_test_data.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import argparse
import os
import json
import os

from mwdblib import Malwarecage, MalwarecageConfig # type: ignore


Expand Down
3 changes: 2 additions & 1 deletion tests/reparse_file.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import argparse
import os
import json
import os

from src.api import parse


Expand Down
5 changes: 3 additions & 2 deletions tests/test_parse_regression.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import unittest
import json
from src.api import parse
import os
import unittest

from src.api import parse


class TestParseRegression(unittest.TestCase):
Expand Down
Loading