diff --git a/requirements.txt b/requirements.txt index 55e1a4e..79628b1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ pycryptodomex malduck pymisp -mwdblib>=3.3.0 \ No newline at end of file +mwdblib>=3.3.0 +maco diff --git a/src/api.py b/src/api.py index 92ef033..621ab5f 100644 --- a/src/api.py +++ b/src/api.py @@ -5,10 +5,13 @@ def parse(family: str, config: Dict[str, Any]) -> IocCollection: - iocs = IocCollection() + """Parse a mwdb static config of the given family, and get a IocCollection + + :param family: Family this config belongs to + :param config: MWDB configuration dict""" + iocs = IocCollection(family) if family in modules.modules: - iocs = modules.modules[family](config) + modules.modules[family](config, iocs) modules.parse(config, iocs) - return iocs diff --git a/src/model.py b/src/model.py index 1425eda..719126b 100644 --- a/src/model.py +++ b/src/model.py @@ -5,6 +5,7 @@ from urllib.parse import urlparse from Cryptodome.PublicKey import RSA # type: ignore +from maco import model # type: ignore from malduck import base64, rsa # type: ignore from pymisp import MISPAttribute, MISPObject # type: ignore @@ -201,8 +202,9 @@ def prettyprint(self) -> str: class IocCollection: """Represents a collection of parsed IoCs""" - def __init__(self) -> None: + def __init__(self, family: str) -> None: """Creates an empty IocCollection instance""" + self.family = family self.rsa_keys: List[RsaKey] = [] self.ecdsa_curves: List[EcdsaCurve] = [] self.keys: List[Tuple[str, str]] = [] # (keytype, hexencoded key) @@ -331,9 +333,83 @@ def to_misp(self) -> List[MISPObject]: # filter out objects without any attributes to_return = list(filter(lambda x: bool(x.attributes), to_return)) - return to_return + def to_maco(self) -> model.ExtractorModel: + output = model.ExtractorModel(family=self.family) + + for rsakey in self.rsa_keys: + obj = model.ExtractorModel.Encryption( + algorithm="rsa", + public_key=str((rsakey.n, rsakey.e)), + ) + if rsakey.d: + obj.key = str((rsakey.n, rsakey.d)) + output.encryption.append(obj) + + for curve in self.ecdsa_curves: + output.encryption.append( + model.ExtractorModel.Encryption( + algorithm=curve.t, # for example, "ecdsa_pub_p384" + public_key=str((curve.x, curve.y)), + ) + ) + + for key in self.keys: + output.encryption.append( + model.ExtractorModel.Encryption( + algorithm=key[0], + key=key[1], + ) + ) + + for password in self.passwords: + output.password.append(password) + + def location_type_to_maco(location_type: LocationType) -> str: + if location_type in [LocationType.CNC, LocationType.PANEL]: + # Panel is not 100% technically correct here + return "c2" + elif location_type == LocationType.DOWNLOAD_URL: + return "download" + elif location_type in [LocationType.OTHER, LocationType.PEER]: + return "other" + else: + raise ValueError(f"Unknown location type: {location_type}") + + for netloc in self.network_locations: + if netloc.scheme in ["https", "http"]: + output.http.append( + model.ExtractorModel.Http( + protocol=netloc.scheme, + uri=netloc.url.geturl(), + usage=location_type_to_maco(netloc.location_type), + ) + ) + else: + output.tcp.append( + model.ExtractorModel.Connection( + server_ip=netloc.url.hostname, + server_port=netloc.port, + ) + ) + + for mutex in self.mutexes: + output.mutex.append(mutex) + + for filename in self.dropped_filenames: + output.paths.append(filename) + + # Not supported by Maco + # for email in self.emails_to + self.emails_from: + # output.emails.append(email) + + # Not supported by Maco + # for message in self.ransom_messages: + # output.messages.append(message) + + return output.model_dump(exclude_defaults=True) + def prettyprint(self) -> str: """Pretty print for debugging""" result = [] diff --git a/src/modules.py b/src/modules.py index 63ee69f..dceb68a 100644 --- a/src/modules.py +++ b/src/modules.py @@ -161,16 +161,14 @@ def parse(config: Dict[str, Any], iocs: IocCollection) -> None: @module("netwire") -def parse_netwire(config: Dict[str, Any]) -> IocCollection: +def parse_netwire(config: Dict[str, Any], iocs: IocCollection) -> None: if "mutex" in config and isinstance(config["mutex"], bool): # netwire "mutex" is bool for some reason del config["mutex"] - return IocCollection() @module("quasarrat") -def parse_quasarrat(config: Dict[str, Any]) -> IocCollection: - iocs = IocCollection() +def parse_quasarrat(config: Dict[str, Any], iocs: IocCollection) -> None: if "encryption_key" in config: iocs.add_password(config["encryption_key"]) del config["encryption_key"] @@ -178,166 +176,129 @@ def parse_quasarrat(config: Dict[str, Any]) -> IocCollection: if "install_name" in config: iocs.add_drop_filename(config["install_name"]) - return iocs - @module("hawkeye") -def parse_hawkeye(config: Dict[str, Any]) -> IocCollection: - iocs = IocCollection() - +def parse_hawkeye(config: Dict[str, Any], iocs: IocCollection) -> None: if config.get("EmailUsername"): iocs.add_email_to(config["EmailUsername"]) - return iocs - @module("trickbot") -def parse_trickbot(config: Dict[str, Any]) -> IocCollection: - iocs = IocCollection() +def parse_trickbot(config: Dict[str, Any], iocs: IocCollection) -> None: if config.get("public_key"): ecdsa = config["public_key"] iocs.add_ecdsa_curve( EcdsaCurve(ecdsa["t"], int(ecdsa["x"]), int(ecdsa["y"])), ) del config["public_key"] - return iocs @module("ramnit") -def parse_ramnit(config: Dict[str, Any]) -> IocCollection: - iocs = IocCollection() +def parse_ramnit(config: Dict[str, Any], iocs: IocCollection) -> None: for domain in config.get("hardcoded_domain", []): iocs.try_add_url(domain) - return iocs @module("legionloader") -def parse_legionloader(config: Dict[str, Any]) -> IocCollection: - iocs = IocCollection() +def parse_legionloader(config: Dict[str, Any], iocs: IocCollection) -> None: if "stealer" in config: iocs.try_add_url(config["stealer"]) for drop in config.get("drops", []): iocs.try_add_url(drop, location_type=LocationType.DOWNLOAD_URL) - return iocs @module("panda") -def parse_panda(config: Dict[str, Any]) -> IocCollection: - iocs = IocCollection() +def parse_panda(config: Dict[str, Any], iocs: IocCollection) -> None: if "comm_public_key" in config: if type(config["comm_public_key"]) == str: iocs.try_add_rsa_from_pem(config["comm_public_key"]) elif type(config["comm_public_key"]) == dict: key = config["comm_public_key"] iocs.add_rsa_key(RsaKey(int(key["n"]), int(key["e"]))) - return iocs @module("danabot") -def parse_vjworm(config: Dict[str, Any]) -> IocCollection: - iocs = IocCollection() +def parse_vjworm(config: Dict[str, Any], iocs: IocCollection) -> None: if "rsa_key" in config: iocs.try_add_rsa_from_base64(config["rsa_key"]) del config["rsa_key"] - return iocs @module("nymaim") -def parse_nymaim(config: Dict[str, Any]) -> IocCollection: - iocs = IocCollection() +def parse_nymaim(config: Dict[str, Any], iocs: IocCollection) -> None: if "urls" in config: for url in config["urls"]: url = url.replace("]", "") # some mistakes cannot be unmade iocs.try_add_url(url) del config["urls"] - return iocs @module("zeus") -def parse_zeus(config: Dict[str, Any]) -> IocCollection: - iocs = IocCollection() +def parse_zeus(config: Dict[str, Any], iocs: IocCollection) -> None: if "rc4sbox" in config: iocs.add_key("rc4", config["rc4sbox"]) - return iocs @module("vmzeus") -def parse_vmzeus(config: Dict[str, Any]) -> IocCollection: - iocs = IocCollection() +def parse_vmzeus(config: Dict[str, Any], iocs: IocCollection) -> None: if "rc4sbox" in config: iocs.add_key("rc4", config["rc4sbox"]) if "rc6sbox" in config: iocs.add_key("rc6", config["rc6sbox"]) - return iocs @module("sendsafe") -def parse_sendsafe(config: Dict[str, Any]) -> IocCollection: - iocs = IocCollection() +def parse_sendsafe(config: Dict[str, Any], iocs: IocCollection) -> None: if "cnc" in config and "http_port" in config: iocs.add_host_port(config["cnc"], int(config["http_port"])) del config["cnc"] del config["http_port"] - return iocs @module("necurs") -def parse_necurs(config: Dict[str, Any]) -> IocCollection: - iocs = IocCollection() +def parse_necurs(config: Dict[str, Any], iocs: IocCollection) -> None: add_rsa_key(iocs, config, "c2_public_key") - return iocs @module("isfb") -def parse_isfb(config: Dict[str, Any]) -> IocCollection: - iocs = IocCollection() +def parse_isfb(config: Dict[str, Any], iocs: IocCollection) -> None: if "key" in config: # "key" key is a serpent key iocs.add_key("serpent", "key") del config["key"] - return iocs @module("guloader") -def parse_guloader(config: Dict[str, Any]) -> IocCollection: - iocs = IocCollection() +def parse_guloader(config: Dict[str, Any], iocs: IocCollection) -> None: if "key" in config: # "key" key is a xor key iocs.add_key("xor", "key") del config["key"] - return iocs @module("pushdo") -def parse_pushdo(config: Dict[str, Any]) -> IocCollection: - iocs = IocCollection() +def parse_pushdo(config: Dict[str, Any], iocs: IocCollection) -> None: if "cfgkey" in config: add_rsa_key(iocs, config, "cfgkey") del config["cfgkey"] - return iocs @module("locky") -def parse_locky(config: Dict[str, Any]) -> IocCollection: - iocs = IocCollection() +def parse_locky(config: Dict[str, Any], iocs: IocCollection) -> None: for payment_domain in config["payment_domain"]: iocs.try_add_url(payment_domain) - return iocs @module("cerber") -def parse_cerber(config: Dict[str, Any]) -> IocCollection: - iocs = IocCollection() +def parse_cerber(config: Dict[str, Any], iocs: IocCollection) -> None: for dpurl in config.get("default_payment_url", []): iocs.try_add_url(dpurl) if "global_public_key" in config: iocs.try_add_rsa_from_base64(config["global_public_key"]) - return iocs @module("kbot") -def parse_kbot(config: Dict[str, Any]) -> IocCollection: - iocs = IocCollection() +def parse_kbot(config: Dict[str, Any], iocs: IocCollection) -> None: if "public_key" in config: pk = config["public_key"] if isinstance(pk, list) and pk and isinstance(pk[-1], int): @@ -347,20 +308,15 @@ def parse_kbot(config: Dict[str, Any]) -> IocCollection: iocs.add_key("other", config["serverpub"]) if "botcommunity" in config: iocs.add_campaign_id(config["botcommunity"]) - return iocs @module("alien") -def parse_alien(config: Dict[str, Any]) -> IocCollection: - iocs = IocCollection() +def parse_alien(config: Dict[str, Any], iocs: IocCollection) -> None: add_url(iocs, config, "C2 alt") - return iocs @module("lockbit") -def parse_lockbit(config: Dict[str, Any]) -> IocCollection: - iocs = IocCollection() - +def parse_lockbit(config: Dict[str, Any], iocs: IocCollection) -> None: # as far as I can tell, this is a custom format used by lockbit if "rsa_pub" in config: try: @@ -374,13 +330,9 @@ def parse_lockbit(config: Dict[str, Any]) -> IocCollection: except Exception: pass - return iocs - @module("agenttesla") -def parse_agenttesla(config: Dict[str, Any]) -> IocCollection: - iocs = IocCollection() - +def parse_agenttesla(config: Dict[str, Any], iocs: IocCollection) -> None: if config.get("email"): iocs.add_email_from(config["email"]) del config["email"] @@ -389,13 +341,9 @@ def parse_agenttesla(config: Dict[str, Any]) -> IocCollection: iocs.add_email_to(config["email_to"]) del config["email_to"] - return iocs - @module("formbook") -def parse_formbook(config: Dict[str, Any]) -> IocCollection: - iocs = IocCollection() - +def parse_formbook(config: Dict[str, Any], iocs: IocCollection) -> None: if "urls" in config: del config["urls"] @@ -403,13 +351,9 @@ def parse_formbook(config: Dict[str, Any]) -> IocCollection: iocs.try_add_url(config["c2_url"]) del config["c2_url"] - return iocs - @module("cobaltstrike") -def parse_cobaltstrike(config: Dict[str, Any]) -> IocCollection: - iocs = IocCollection() - +def parse_cobaltstrike(config: Dict[str, Any], iocs: IocCollection) -> None: if config.get("payload_type", "").endswith("stager"): for url_row in config.get("stager_url", []): url = url_row["url"] @@ -426,5 +370,3 @@ def parse_cobaltstrike(config: Dict[str, Any]) -> IocCollection: iocs.try_add_url(f"{scheme}://{hostname}:{port}{path}") del config["urls"] - - return iocs diff --git a/tests/test_parse_regression.py b/tests/test_parse_regression.py index fab2cc7..1154209 100644 --- a/tests/test_parse_regression.py +++ b/tests/test_parse_regression.py @@ -33,6 +33,10 @@ def test_regression(self): print(iocs.prettyprint()) self.assertTrue(False) + # Ensure conversions doesn't throw an exception + # iocs.to_misp() # TODO: this doesn't pass the tests currently + iocs.to_maco() + if __name__ == "__main__": unittest.main()