Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initial mako support #38

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@


def parse(family: str, config: Dict[str, Any]) -> IocCollection:
iocs = IocCollection()
"""Parse a mwdb static config of the given family, and get a IocCollection

:param family: Family this config belongs to
:param config: MWDB configuration dict"""
iocs = IocCollection(family)
if family in modules.modules:
iocs = modules.modules[family](config)
modules.modules[family](config, iocs)

modules.parse(config, iocs)

return iocs
80 changes: 78 additions & 2 deletions src/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from urllib.parse import urlparse

from Cryptodome.PublicKey import RSA # type: ignore
from maco import model
from malduck import base64, rsa # type: ignore
from pymisp import MISPAttribute, MISPObject # type: ignore

Expand Down Expand Up @@ -201,8 +202,9 @@ def prettyprint(self) -> str:
class IocCollection:
"""Represents a collection of parsed IoCs"""

def __init__(self) -> None:
def __init__(self, family: str) -> None:
"""Creates an empty IocCollection instance"""
self.family = family
self.rsa_keys: List[RsaKey] = []
self.ecdsa_curves: List[EcdsaCurve] = []
self.keys: List[Tuple[str, str]] = [] # (keytype, hexencoded key)
Expand Down Expand Up @@ -331,9 +333,83 @@ def to_misp(self) -> List[MISPObject]:

# filter out objects without any attributes
to_return = list(filter(lambda x: bool(x.attributes), to_return))

return to_return

def to_maco(self) -> model.ExtractorModel:
output = model.ExtractorModel(family=self.family)

for key in self.rsa_keys:
obj = model.ExtractorModel.Encryption(
algorithm="rsa",
public_key=str((key.n, key.e)),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the recommended way to serialize a RSA public key? As far as I can see, I can only pass str

)
if key.d:
obj.key = str((key.n, key.d))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question goes here

output.encryption.append(obj)

for curve in self.ecdsa_curves:
output.encryption.append(
model.ExtractorModel.Encryption(
algorithm=curve.t, # for example, "ecdsa_pub_p384"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a convention for naming this types (we don't have an enum ourselves, though we map ecdsa_pub_p384 to something that MISP wants)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far I don't believe we have a naming convention for the algorithm field

public_key=str((curve.x, curve.y)),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here

)
)

for key in self.keys:
output.encryption.append(
model.ExtractorModel.Encryption(
algorithm=key[0],
key=key[1],
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hexencoded

)
)

for password in self.passwords:
output.password.append(password)

def location_type_to_maco(location_type: LocationType) -> str:
if location_type in [LocationType.CNC, LocationType.PANEL]:
# Panel is not 100% technically correct here
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Panel should be close enough to C2 to do this mapping

return "c2"
elif location_type == LocationType.DOWNLOAD_URL:
return "download"
elif location_type in [LocationType.OTHER, LocationType.PEER]:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No way to add a peer I think (it's not useful nowadays anyway)

return "other"
else:
raise ValueError(f"Unknown location type: {location_type}")

for netloc in self.network_locations:
if netloc.scheme in ["https", "http"]:
output.http.append(
model.ExtractorModel.Http(
protocol=netloc.scheme,
uri=netloc.url.geturl(),
usage=location_type_to_maco(netloc.location_type),
)
)
else:
output.tcp.append(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't keep track of connection type for non-http connections, sadly. But I can't think of a situation where this was not TCP

model.ExtractorModel.Connection(
server_ip=netloc.url.hostname,
server_port=netloc.port,
)
)

for mutex in self.mutexes:
output.mutex.append(mutex)

for filename in self.dropped_filenames:
output.paths.append(filename)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think i'm not adhering to the spirit of this field - we only store filenames, not complete paths.


# Not supported by Maco
# for email in self.emails_to + self.emails_from:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, for credentials stealing

# output.emails.append(email)
Comment on lines +403 to +405
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


# Not supported by Maco
# for message in self.ransom_messages:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used by ransomware

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this could be something added to decoded_strings field or perhaps we should introduce a body field to the SMTP model to maintain the context of the string as being a ransomware-related to an email?

# output.messages.append(message)

return output.model_dump(exclude_defaults=True)

def prettyprint(self) -> str:
"""Pretty print for debugging"""
result = []
Expand Down
106 changes: 24 additions & 82 deletions src/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,183 +161,144 @@ def parse(config: Dict[str, Any], iocs: IocCollection) -> None:


@module("netwire")
def parse_netwire(config: Dict[str, Any]) -> IocCollection:
def parse_netwire(config: Dict[str, Any], iocs: IocCollection) -> None:
if "mutex" in config and isinstance(config["mutex"], bool):
# netwire "mutex" is bool for some reason
del config["mutex"]
return IocCollection()


@module("quasarrat")
def parse_quasarrat(config: Dict[str, Any]) -> IocCollection:
iocs = IocCollection()
def parse_quasarrat(config: Dict[str, Any], iocs: IocCollection) -> None:
if "encryption_key" in config:
iocs.add_password(config["encryption_key"])
del config["encryption_key"]

if "install_name" in config:
iocs.add_drop_filename(config["install_name"])

return iocs


@module("hawkeye")
def parse_hawkeye(config: Dict[str, Any]) -> IocCollection:
iocs = IocCollection()

def parse_hawkeye(config: Dict[str, Any], iocs: IocCollection) -> None:
if config.get("EmailUsername"):
iocs.add_email_to(config["EmailUsername"])

return iocs


@module("trickbot")
def parse_trickbot(config: Dict[str, Any]) -> IocCollection:
iocs = IocCollection()
def parse_trickbot(config: Dict[str, Any], iocs: IocCollection) -> None:
if config.get("public_key"):
ecdsa = config["public_key"]
iocs.add_ecdsa_curve(
EcdsaCurve(ecdsa["t"], int(ecdsa["x"]), int(ecdsa["y"])),
)
del config["public_key"]
return iocs


@module("ramnit")
def parse_ramnit(config: Dict[str, Any]) -> IocCollection:
iocs = IocCollection()
def parse_ramnit(config: Dict[str, Any], iocs: IocCollection) -> None:
for domain in config.get("hardcoded_domain", []):
iocs.try_add_url(domain)
return iocs


@module("legionloader")
def parse_legionloader(config: Dict[str, Any]) -> IocCollection:
iocs = IocCollection()
def parse_legionloader(config: Dict[str, Any], iocs: IocCollection) -> None:
if "stealer" in config:
iocs.try_add_url(config["stealer"])
for drop in config.get("drops", []):
iocs.try_add_url(drop, location_type=LocationType.DOWNLOAD_URL)
return iocs


@module("panda")
def parse_panda(config: Dict[str, Any]) -> IocCollection:
iocs = IocCollection()
def parse_panda(config: Dict[str, Any], iocs: IocCollection) -> None:
if "comm_public_key" in config:
if type(config["comm_public_key"]) == str:
iocs.try_add_rsa_from_pem(config["comm_public_key"])
elif type(config["comm_public_key"]) == dict:
key = config["comm_public_key"]
iocs.add_rsa_key(RsaKey(int(key["n"]), int(key["e"])))
return iocs


@module("danabot")
def parse_vjworm(config: Dict[str, Any]) -> IocCollection:
iocs = IocCollection()
def parse_vjworm(config: Dict[str, Any], iocs: IocCollection) -> None:
if "rsa_key" in config:
iocs.try_add_rsa_from_base64(config["rsa_key"])
del config["rsa_key"]
return iocs


@module("nymaim")
def parse_nymaim(config: Dict[str, Any]) -> IocCollection:
iocs = IocCollection()
def parse_nymaim(config: Dict[str, Any], iocs: IocCollection) -> None:
if "urls" in config:
for url in config["urls"]:
url = url.replace("]", "") # some mistakes cannot be unmade
iocs.try_add_url(url)
del config["urls"]
return iocs


@module("zeus")
def parse_zeus(config: Dict[str, Any]) -> IocCollection:
iocs = IocCollection()
def parse_zeus(config: Dict[str, Any], iocs: IocCollection) -> None:
if "rc4sbox" in config:
iocs.add_key("rc4", config["rc4sbox"])
return iocs


@module("vmzeus")
def parse_vmzeus(config: Dict[str, Any]) -> IocCollection:
iocs = IocCollection()
def parse_vmzeus(config: Dict[str, Any], iocs: IocCollection) -> None:
if "rc4sbox" in config:
iocs.add_key("rc4", config["rc4sbox"])
if "rc6sbox" in config:
iocs.add_key("rc6", config["rc6sbox"])
return iocs


@module("sendsafe")
def parse_sendsafe(config: Dict[str, Any]) -> IocCollection:
iocs = IocCollection()
def parse_sendsafe(config: Dict[str, Any], iocs: IocCollection) -> None:
if "cnc" in config and "http_port" in config:
iocs.add_host_port(config["cnc"], int(config["http_port"]))
del config["cnc"]
del config["http_port"]
return iocs


@module("necurs")
def parse_necurs(config: Dict[str, Any]) -> IocCollection:
iocs = IocCollection()
def parse_necurs(config: Dict[str, Any], iocs: IocCollection) -> None:
add_rsa_key(iocs, config, "c2_public_key")
return iocs


@module("isfb")
def parse_isfb(config: Dict[str, Any]) -> IocCollection:
iocs = IocCollection()
def parse_isfb(config: Dict[str, Any], iocs: IocCollection) -> None:
if "key" in config:
# "key" key is a serpent key
iocs.add_key("serpent", "key")
del config["key"]
return iocs


@module("guloader")
def parse_guloader(config: Dict[str, Any]) -> IocCollection:
iocs = IocCollection()
def parse_guloader(config: Dict[str, Any], iocs: IocCollection) -> None:
if "key" in config:
# "key" key is a xor key
iocs.add_key("xor", "key")
del config["key"]
return iocs


@module("pushdo")
def parse_pushdo(config: Dict[str, Any]) -> IocCollection:
iocs = IocCollection()
def parse_pushdo(config: Dict[str, Any], iocs: IocCollection) -> None:
if "cfgkey" in config:
add_rsa_key(iocs, config, "cfgkey")
del config["cfgkey"]
return iocs


@module("locky")
def parse_locky(config: Dict[str, Any]) -> IocCollection:
iocs = IocCollection()
def parse_locky(config: Dict[str, Any], iocs: IocCollection) -> None:
for payment_domain in config["payment_domain"]:
iocs.try_add_url(payment_domain)
return iocs


@module("cerber")
def parse_cerber(config: Dict[str, Any]) -> IocCollection:
iocs = IocCollection()
def parse_cerber(config: Dict[str, Any], iocs: IocCollection) -> None:
for dpurl in config.get("default_payment_url", []):
iocs.try_add_url(dpurl)
if "global_public_key" in config:
iocs.try_add_rsa_from_base64(config["global_public_key"])
return iocs


@module("kbot")
def parse_kbot(config: Dict[str, Any]) -> IocCollection:
iocs = IocCollection()
def parse_kbot(config: Dict[str, Any], iocs: IocCollection) -> None:
if "public_key" in config:
pk = config["public_key"]
if isinstance(pk, list) and pk and isinstance(pk[-1], int):
Expand All @@ -347,20 +308,15 @@ def parse_kbot(config: Dict[str, Any]) -> IocCollection:
iocs.add_key("other", config["serverpub"])
if "botcommunity" in config:
iocs.add_campaign_id(config["botcommunity"])
return iocs


@module("alien")
def parse_alien(config: Dict[str, Any]) -> IocCollection:
iocs = IocCollection()
def parse_alien(config: Dict[str, Any], iocs: IocCollection) -> None:
add_url(iocs, config, "C2 alt")
return iocs


@module("lockbit")
def parse_lockbit(config: Dict[str, Any]) -> IocCollection:
iocs = IocCollection()

def parse_lockbit(config: Dict[str, Any], iocs: IocCollection) -> None:
# as far as I can tell, this is a custom format used by lockbit
if "rsa_pub" in config:
try:
Expand All @@ -374,13 +330,9 @@ def parse_lockbit(config: Dict[str, Any]) -> IocCollection:
except Exception:
pass

return iocs


@module("agenttesla")
def parse_agenttesla(config: Dict[str, Any]) -> IocCollection:
iocs = IocCollection()

def parse_agenttesla(config: Dict[str, Any], iocs: IocCollection) -> None:
if config.get("email"):
iocs.add_email_from(config["email"])
del config["email"]
Expand All @@ -389,27 +341,19 @@ def parse_agenttesla(config: Dict[str, Any]) -> IocCollection:
iocs.add_email_to(config["email_to"])
del config["email_to"]

return iocs


@module("formbook")
def parse_formbook(config: Dict[str, Any]) -> IocCollection:
iocs = IocCollection()

def parse_formbook(config: Dict[str, Any], iocs: IocCollection) -> None:
if "urls" in config:
del config["urls"]

if "c2_url" in config:
iocs.try_add_url(config["c2_url"])
del config["c2_url"]

return iocs


@module("cobaltstrike")
def parse_cobaltstrike(config: Dict[str, Any]) -> IocCollection:
iocs = IocCollection()

def parse_cobaltstrike(config: Dict[str, Any], iocs: IocCollection) -> None:
if config.get("payload_type", "").endswith("stager"):
for url_row in config.get("stager_url", []):
url = url_row["url"]
Expand All @@ -426,5 +370,3 @@ def parse_cobaltstrike(config: Dict[str, Any]) -> IocCollection:
iocs.try_add_url(f"{scheme}://{hostname}:{port}{path}")

del config["urls"]

return iocs
Loading
Loading