From c742ccbd4bcf5ef828440d9f4a49434031ac033e Mon Sep 17 00:00:00 2001 From: msm Date: Thu, 14 Nov 2024 17:26:52 +0100 Subject: [PATCH 1/7] Small error handling tweaks --- src/api.py | 3 +++ src/model.py | 19 +++++++++++++------ src/modules.py | 30 +++++++++++++++++++++--------- src/run.py | 12 +++++++++++- 4 files changed, 48 insertions(+), 16 deletions(-) diff --git a/src/api.py b/src/api.py index 92ef033..804b78b 100644 --- a/src/api.py +++ b/src/api.py @@ -6,9 +6,12 @@ def parse(family: str, config: Dict[str, Any]) -> IocCollection: iocs = IocCollection() + + # Handlers for special-cased families if family in modules.modules: iocs = modules.modules[family](config) + # Generic parser for the rest of the fields modules.parse(config, iocs) return iocs diff --git a/src/model.py b/src/model.py index 1425eda..c7e175c 100644 --- a/src/model.py +++ b/src/model.py @@ -2,6 +2,7 @@ from base64 import b64encode from enum import Enum from typing import List, Optional, Tuple, Union, cast +import logging from urllib.parse import urlparse from Cryptodome.PublicKey import RSA # type: ignore @@ -10,6 +11,8 @@ from .errors import IocExtractError +log = logging.getLogger(__name__) + PUBKEY_PEM_TEMPLATE = ( "-----BEGIN PUBLIC KEY-----\n{}\n-----END PUBLIC KEY-----" ) @@ -230,7 +233,7 @@ def try_add_rsa_from_pem(self, pem: str) -> None: if pem: self.add_rsa_key(RsaKey.parse_pem(pem)) except IocExtractError: - pass + log.warn("Failed to parse a RSA key from PEM") def try_add_rsa_from_asn1_bytes(self, blob: bytes) -> None: pem = PUBKEY_PEM_TEMPLATE.format(b64encode(blob).decode()) @@ -238,13 +241,13 @@ def try_add_rsa_from_asn1_bytes(self, blob: bytes) -> None: try: self.add_rsa_key(RsaKey.parse_pem(pem)) except IocExtractError: - pass + log.warn("Failed to parse a RSA key from ASN1") def try_add_rsa_from_base64(self, pem: str) -> None: try: self.add_rsa_key(RsaKey.parse_base64(pem)) except IocExtractError: - pass + log.warn("Failed to parse a RSA key from base64") def add_network_location(self, netloc: NetworkLocation) -> None: self.network_locations.append(netloc) @@ -253,13 +256,17 @@ def add_host_port( self, host: str, port: Union[str, int], schema: str = "unknown" ) -> None: if isinstance(port, str): - port_val = int(port) + try: + port_val = int(port) + except ValueError: + log.warn("Failed to add URL from host_port") + return else: port_val = port try: self.try_add_url(f"{schema}://{host}:{port_val}") except IocExtractError: - pass + log.warn("Failed to add URL from host_port") def try_add_url( self, url: str, location_type: LocationType = LocationType.CNC @@ -271,7 +278,7 @@ def try_add_url( NetworkLocation(url, location_type=location_type) ) except IocExtractError: - pass + log.warn("Failed to add URL directly") def add_password(self, password: str) -> None: self.passwords.append(password) diff --git a/src/modules.py b/src/modules.py index 63ee69f..cff03e2 100644 --- a/src/modules.py +++ b/src/modules.py @@ -1,15 +1,26 @@ import string from base64 import b64decode from typing import Any, Dict, List +import logging -from .errors import ModuleAlreadyRegisteredError +from .errors import ModuleAlreadyRegisteredError, IocExtractError from .model import EcdsaCurve, IocCollection, LocationType, RsaKey modules: Dict[str, Any] = {} +log = logging.getLogger(__name__) -# Utils +class CantFindAHostForTheDomain(IocExtractError): + """Can't find a host for the domain when adding url.""" + pass +class DomainHasToBeAStringOrADict(IocExtractError): + """Adding URL from something other than string or a dict.""" + pass + +class UnknownRsaKeyType(IocExtractError): + """Can't guess the RSA key format.""" + pass def module(name): def decorator(func): @@ -50,11 +61,9 @@ def add_url(iocs: IocCollection, config: Dict[str, Any], key: str) -> None: iocs.try_add_url(domain[hostkey]) break else: - raise NotImplementedError("Can't find a host for the domain") + raise CantFindAHostForTheDomain() else: - raise NotImplementedError( - "The domain has to be either a string or a list" - ) + raise DomainHasToBeAStringOrADict() def add_rsa_key(iocs: IocCollection, config: Dict, key: str) -> None: @@ -87,7 +96,7 @@ def add_rsa_key(iocs: IocCollection, config: Dict, key: str) -> None: iocs.try_add_rsa_from_asn1_bytes(enc_bytes.rstrip(b"\x00")) continue - raise NotImplementedError("Unknown RSA key type") + raise UnknownRsaKeyType() def add_key(iocs: IocCollection, config: Dict, key: str, keytype: str) -> None: @@ -106,7 +115,10 @@ def add_mutex(iocs: IocCollection, config: Dict, key: str) -> None: def parse(config: Dict[str, Any], iocs: IocCollection) -> None: for name in ["publickey", "rsapub", "rsakey", "pubkey", "privkey"]: - add_rsa_key(iocs, config, name) + try: + add_rsa_key(iocs, config, name) + except UnknownRsaKeyType: + log.warn("Unknown RSA key type") for name in [ "urls", @@ -372,7 +384,7 @@ def parse_lockbit(config: Dict[str, Any]) -> IocCollection: iocs.add_rsa_key(RsaKey(n=n, e=e)) del config["rsa_pub"] except Exception: - pass + log.warn("Failed to parse a lockbit key") return iocs diff --git a/src/run.py b/src/run.py index dea59f8..07fe7da 100644 --- a/src/run.py +++ b/src/run.py @@ -1,4 +1,5 @@ import argparse +import logging from mwdblib import MWDB # type: ignore @@ -14,8 +15,16 @@ def main(): parser.add_argument( "config_id", help="Config to parse", default=None, nargs="?" ) + parser.add_argument( + "-v", "--verbose", help="Print debug logs", action="store_true" + ) args = parser.parse_args() + if args.verbose: + logging.basicConfig(level=logging.DEBUG) + else: + logging.basicConfig(level=logging.INFO) + mwdb = MWDB() mwdb.login(args.mwdb_user, args.mwdb_pass) @@ -28,9 +37,10 @@ def main(): for cfg in mwdb.recent_configs(): if cfg.type != "static": continue - print(cfg.id) + print(cfg.family, cfg.id) iocs = parse(cfg.family, cfg.cfg) print(iocs.prettyprint()) + print() continue From ef21fa1b66636cc38a1af807694e20d7f9a0850f Mon Sep 17 00:00:00 2001 From: msm Date: Thu, 14 Nov 2024 17:31:38 +0100 Subject: [PATCH 2/7] Fix isort --- src/model.py | 2 +- src/modules.py | 4 ++-- tests/download_test_data.py | 3 ++- tests/reparse_file.py | 3 ++- tests/test_parse_regression.py | 5 +++-- 5 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/model.py b/src/model.py index c7e175c..76946e7 100644 --- a/src/model.py +++ b/src/model.py @@ -1,8 +1,8 @@ +import logging import re from base64 import b64encode from enum import Enum from typing import List, Optional, Tuple, Union, cast -import logging from urllib.parse import urlparse from Cryptodome.PublicKey import RSA # type: ignore diff --git a/src/modules.py b/src/modules.py index cff03e2..dcbead8 100644 --- a/src/modules.py +++ b/src/modules.py @@ -1,9 +1,9 @@ +import logging import string from base64 import b64decode from typing import Any, Dict, List -import logging -from .errors import ModuleAlreadyRegisteredError, IocExtractError +from .errors import IocExtractError, ModuleAlreadyRegisteredError from .model import EcdsaCurve, IocCollection, LocationType, RsaKey modules: Dict[str, Any] = {} diff --git a/tests/download_test_data.py b/tests/download_test_data.py index d2c22b5..06a69ef 100644 --- a/tests/download_test_data.py +++ b/tests/download_test_data.py @@ -1,6 +1,7 @@ import argparse -import os import json +import os + from mwdblib import Malwarecage, MalwarecageConfig # type: ignore diff --git a/tests/reparse_file.py b/tests/reparse_file.py index 892a4a4..5f13dad 100644 --- a/tests/reparse_file.py +++ b/tests/reparse_file.py @@ -1,6 +1,7 @@ import argparse -import os import json +import os + from src.api import parse diff --git a/tests/test_parse_regression.py b/tests/test_parse_regression.py index fab2cc7..0fbb9d6 100644 --- a/tests/test_parse_regression.py +++ b/tests/test_parse_regression.py @@ -1,7 +1,8 @@ -import unittest import json -from src.api import parse import os +import unittest + +from src.api import parse class TestParseRegression(unittest.TestCase): From f2c6280fd0ad0967d1daf062a85f47b768cc9246 Mon Sep 17 00:00:00 2001 From: msm Date: Thu, 14 Nov 2024 17:35:47 +0100 Subject: [PATCH 3/7] Fix black --- src/model.py | 2 +- src/modules.py | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/model.py b/src/model.py index 76946e7..3c80e3c 100644 --- a/src/model.py +++ b/src/model.py @@ -259,7 +259,7 @@ def add_host_port( try: port_val = int(port) except ValueError: - log.warn("Failed to add URL from host_port") + log.warn("Failed to add URL from host_port") return else: port_val = port diff --git a/src/modules.py b/src/modules.py index dcbead8..85d5cd8 100644 --- a/src/modules.py +++ b/src/modules.py @@ -12,16 +12,22 @@ class CantFindAHostForTheDomain(IocExtractError): """Can't find a host for the domain when adding url.""" + pass + class DomainHasToBeAStringOrADict(IocExtractError): """Adding URL from something other than string or a dict.""" + pass + class UnknownRsaKeyType(IocExtractError): """Can't guess the RSA key format.""" + pass + def module(name): def decorator(func): if name in modules: From a9892706f810a39e7fb42b90e2ff8077c4d2ad26 Mon Sep 17 00:00:00 2001 From: msm-cert <156842376+msm-cert@users.noreply.github.com> Date: Sat, 28 Dec 2024 15:04:43 +0000 Subject: [PATCH 4/7] Update src/model.py --- src/model.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/model.py b/src/model.py index 3c80e3c..f905ecb 100644 --- a/src/model.py +++ b/src/model.py @@ -233,7 +233,7 @@ def try_add_rsa_from_pem(self, pem: str) -> None: if pem: self.add_rsa_key(RsaKey.parse_pem(pem)) except IocExtractError: - log.warn("Failed to parse a RSA key from PEM") + log.warning("Failed to parse a RSA key from PEM") def try_add_rsa_from_asn1_bytes(self, blob: bytes) -> None: pem = PUBKEY_PEM_TEMPLATE.format(b64encode(blob).decode()) From eb7f2e612a0145fbae35deb395eb3aa33c34a6e4 Mon Sep 17 00:00:00 2001 From: msm Date: Sat, 28 Dec 2024 16:06:09 +0100 Subject: [PATCH 5/7] fix exception names --- src/modules.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/modules.py b/src/modules.py index 85d5cd8..931f5fd 100644 --- a/src/modules.py +++ b/src/modules.py @@ -10,13 +10,13 @@ log = logging.getLogger(__name__) -class CantFindAHostForTheDomain(IocExtractError): +class CantFindHostForDomain(IocExtractError): """Can't find a host for the domain when adding url.""" pass -class DomainHasToBeAStringOrADict(IocExtractError): +class InvalidDomainObject(IocExtractError): """Adding URL from something other than string or a dict.""" pass @@ -67,9 +67,9 @@ def add_url(iocs: IocCollection, config: Dict[str, Any], key: str) -> None: iocs.try_add_url(domain[hostkey]) break else: - raise CantFindAHostForTheDomain() + raise CantFindHostForDomain() else: - raise DomainHasToBeAStringOrADict() + raise InvalidDomainObject() def add_rsa_key(iocs: IocCollection, config: Dict, key: str) -> None: From b57845139abcc358e52535e936b0933ed5d9f05f Mon Sep 17 00:00:00 2001 From: msm-cert <156842376+msm-cert@users.noreply.github.com> Date: Mon, 30 Dec 2024 12:09:09 +0000 Subject: [PATCH 6/7] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Michał Praszmo --- src/model.py | 8 ++++---- src/modules.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/model.py b/src/model.py index f905ecb..c47a476 100644 --- a/src/model.py +++ b/src/model.py @@ -247,7 +247,7 @@ def try_add_rsa_from_base64(self, pem: str) -> None: try: self.add_rsa_key(RsaKey.parse_base64(pem)) except IocExtractError: - log.warn("Failed to parse a RSA key from base64") + log.warning("Failed to parse a RSA key from base64") def add_network_location(self, netloc: NetworkLocation) -> None: self.network_locations.append(netloc) @@ -259,14 +259,14 @@ def add_host_port( try: port_val = int(port) except ValueError: - log.warn("Failed to add URL from host_port") + log.warning("Failed to add URL from host_port") return else: port_val = port try: self.try_add_url(f"{schema}://{host}:{port_val}") except IocExtractError: - log.warn("Failed to add URL from host_port") + log.warning("Failed to add URL from host_port") def try_add_url( self, url: str, location_type: LocationType = LocationType.CNC @@ -278,7 +278,7 @@ def try_add_url( NetworkLocation(url, location_type=location_type) ) except IocExtractError: - log.warn("Failed to add URL directly") + log.warning("Failed to add URL directly") def add_password(self, password: str) -> None: self.passwords.append(password) diff --git a/src/modules.py b/src/modules.py index 931f5fd..0cc8bdf 100644 --- a/src/modules.py +++ b/src/modules.py @@ -124,7 +124,7 @@ def parse(config: Dict[str, Any], iocs: IocCollection) -> None: try: add_rsa_key(iocs, config, name) except UnknownRsaKeyType: - log.warn("Unknown RSA key type") + log.warning("Unknown RSA key type") for name in [ "urls", @@ -390,7 +390,7 @@ def parse_lockbit(config: Dict[str, Any]) -> IocCollection: iocs.add_rsa_key(RsaKey(n=n, e=e)) del config["rsa_pub"] except Exception: - log.warn("Failed to parse a lockbit key") + log.warning("Failed to parse a lockbit key") return iocs From aeac328394cc49e448f32af082fb0bc11eb68cfe Mon Sep 17 00:00:00 2001 From: msm-cert <156842376+msm-cert@users.noreply.github.com> Date: Mon, 30 Dec 2024 12:10:57 +0000 Subject: [PATCH 7/7] Update src/model.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Michał Praszmo --- src/model.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/model.py b/src/model.py index c47a476..8d45086 100644 --- a/src/model.py +++ b/src/model.py @@ -241,7 +241,7 @@ def try_add_rsa_from_asn1_bytes(self, blob: bytes) -> None: try: self.add_rsa_key(RsaKey.parse_pem(pem)) except IocExtractError: - log.warn("Failed to parse a RSA key from ASN1") + log.warning("Failed to parse a RSA key from ASN1") def try_add_rsa_from_base64(self, pem: str) -> None: try: