-
Notifications
You must be signed in to change notification settings - Fork 2
/
simulation.py
131 lines (109 loc) · 5.94 KB
/
simulation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import subprocess
import time
TITLE = """
+++ Defender for Cloud Attack Simulation +++
This simulation creates two pods - attacker and victim
The attacker pod will execute the chosen scenario on the victim
"""
MENU = """
Available scenarios:
1. Reconnaissance - Gather information about the cluster environment.
2. Lateral Movement - Cluster to cloud.
3. Secrets Gathering - Search for sensitive information in the victim pod.
4. Cryptomining - Run a cryptominer on the victim pod.
5. Webshell - Exploit a webshell on the victim pod (NOTE: the webshell is used in all scenarios).
6. All - Run all scenarios.
"""
SCENARIOS = ["recon", "lateral-mov", "secrets", "crypto", "webshell", "all"]
HELM_CHART = "oci://ghcr.io/microsoft/defender-for-cloud/attacksimulation/mdc-simulation"
HELM_RELEASE = "mdc-simulation"
NAMESPACE = "mdc-simulation"
ATTACKER = "mdc-simulation-attacker"
VICTIM = "mdc-simulation-victim"
def delete_resources():
subprocess.run(["helm", "uninstall", HELM_RELEASE])
subprocess.run(["kubectl", "delete", "namespace", NAMESPACE])
def run_scenario(scenario):
"""Installs components using helm and shows scenario results."""
# Install or update the helm chart
print("Helm - creating simulation objects in namespace mdc-simulation...")
try:
existing_pod=subprocess.run(["kubectl", "get", "pod", ATTACKER, "-n", NAMESPACE], capture_output=True, text=True)
if existing_pod.stdout:
subprocess.run(["kubectl", "delete", "pod", ATTACKER, "-n", NAMESPACE], check=True, capture_output=True)
subprocess.run(["helm", "upgrade", "--install", HELM_RELEASE, HELM_CHART,
"--set", f"env.name={NAMESPACE}", "--set", f"scenario={scenario}"],
check=True, capture_output=True)
except FileNotFoundError:
print("Can't find Helm. Exiting")
raise FileNotFoundError
except subprocess.CalledProcessError as e:
print("Failed to create Helm chart. Exiting")
raise subprocess.CalledProcessError(returncode=e.returncode, cmd=e.cmd)
print("Creating resources...")
attacker_status = subprocess.run(["kubectl", "get", "pod", ATTACKER, "-n", NAMESPACE, "-o",
r'jsonpath="{.status.phase}"'], capture_output=True, text=True)
victim_status = subprocess.run(["kubectl", "get", "pod", VICTIM, "-n", NAMESPACE, "-o",
r'jsonpath="{.status.phase}"'], capture_output=True, text=True)
while '"Pending"' in (attacker_status.stdout, victim_status.stdout) :
time.sleep(3)
attacker_pending = subprocess.run(["kubectl", "get", "pod", ATTACKER, "-n", NAMESPACE, "-o",
r'jsonpath="{.status.containerStatuses[0].state.waiting.reason}"'], capture_output=True, text=True)
victim_pending = subprocess.run(["kubectl", "get", "pod", VICTIM, "-n", NAMESPACE, "-o",
r'jsonpath="{.status.containerStatuses[0].state.waiting.reason}"'], capture_output=True, text=True)
attacker_status = subprocess.run(["kubectl", "get", "pod", ATTACKER, "-n", NAMESPACE, "-o",
r'jsonpath="{.status.phase}"'], capture_output=True, text=True)
victim_status = subprocess.run(["kubectl", "get", "pod", VICTIM, "-n", NAMESPACE, "-o",
r'jsonpath="{.status.phase}"'], capture_output=True, text=True)
if (attacker_status.stdout =='"Pending"' and attacker_pending.stdout != '"ContainerCreating"') or (victim_status.stdout == '"Pending"' and victim_pending.stdout != '"ContainerCreating"'):
print(f"Failed to create one or more containers.\nAttacker container status: {attacker_pending.stdout},\nVictim container status: {victim_pending.stdout}.")
raise Exception
if '"Failed"' in (attacker_status.stdout, victim_status.stdout):
print(f"Failed to create one or more pods.\nAttacker pod status: {attacker_status.stdout},\nVictim pod status: {victim_status.stdout}.")
raise Exception
# read the attack output
print("Running the scenario...\n")
try:
subprocess.run(["kubectl", "logs", "-f", ATTACKER, "-n", NAMESPACE], timeout=90)
except subprocess.TimeoutExpired:
print("Scenario did not complete successfully (timeout)")
return
last_line = subprocess.run(["kubectl", "logs", "--tail=1", ATTACKER, "-n", NAMESPACE],
text=True, capture_output=True)
if last_line.stdout == "--- Simulation completed ---\n":
print("\nScenario completed successfully.\n")
else:
print("Scenario did not complete successfully")
def start_simulation():
print(MENU)
user_choise = input("Select a scenario: ")
while not (user_choise.isnumeric()) or not(int(user_choise) in range(1, len(SCENARIOS)+1 )):
print("Invalid input")
user_choise = input("Select a scenario: ")
choise = int(user_choise)
try:
run_scenario(SCENARIOS[choise-1])
except FileNotFoundError:
return
except Exception:
release_status = subprocess.run(["helm", "status", HELM_RELEASE], capture_output=True)
if release_status.returncode == 0:
delete_resources()
return
again = input("Run another scenario?(Y/N): ")
while again.upper() not in ["Y", "N"]:
print("Invalid input")
again = input("Run another scenario?(Y/N): ")
if again.upper() == "Y":
subprocess.run(["kubectl", "delete", "pod", ATTACKER, "-n", NAMESPACE], capture_output=True)
start_simulation()
else:
input("Press Any Button to delete resources")
delete_resources()
def main():
print(TITLE)
start_simulation()
if __name__ == '__main__':
main()