forked from red-hat-storage/ocs-monkey
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchaos_runner.py
executable file
·195 lines (171 loc) · 7.87 KB
/
chaos_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
#!/usr/bin/env python3
"""Chaos monkey randomized fault injector."""
# pylint: disable=duplicate-code
import argparse
import logging
import os
import random
import time
from typing import List, Optional
import failure
import failure_ocs
import kube
import util
RUN_ID = random.randrange(999999999)
STEADY_STATE_DEPLOYMENTS: List[str] = []
def verify_steady_state() -> bool:
"""Verify the steady state hypothesis."""
for deploy in STEADY_STATE_DEPLOYMENTS:
[namespace, name] = deploy.split("/")
if not kube.deployment_is_ready(namespace, name):
logging.error("deployment %s failed readiness check", deploy)
assert False, "ABORT"
else:
logging.info("deployment %s is healthy", deploy)
return True
def get_failure(types: List[failure.FailureType]) -> failure.Failure:
"""Get a failure instance that is safe to invoke."""
random.shuffle(types)
for fail_type in types:
try:
instance = fail_type.get()
return instance
except failure.NoSafeFailures:
pass
raise failure.NoSafeFailures
def await_mitigation(instance: failure.Failure,
timeout: float) -> bool:
"""Wait for a failure to be mitigated."""
logging.info("awaiting mitigation")
time_remaining = timeout
sleep_time = 10
while time_remaining > 0 and not instance.mitigated():
verify_steady_state()
time.sleep(sleep_time)
time_remaining -= sleep_time
# Make sure the SUT has recovered (and not timed out)
return instance.mitigated()
def await_next_failure(mttf: float, check_interval: float) -> None:
"""Pause until the next failure."""
logging.info("pausing before next failure")
ss_last_check = 0.0
while random.random() > (1/mttf):
if time.time() > ss_last_check + check_interval:
verify_steady_state()
ss_last_check = time.time()
time.sleep(1)
def main() -> None:
"""Inject randomized faults."""
parser = argparse.ArgumentParser()
parser.add_argument("--additional-failure",
default=0.25,
type=float,
help="Probability of having an additional simultaneous failure [0,1).")
parser.add_argument("--check-interval",
default=30,
type=float,
help="Steady-state check interval (sec)")
parser.add_argument("--cephcluster-name",
default="ocs-storagecluster-cephcluster",
type=str,
help="Name of the cephcluster object")
parser.add_argument("-l", "--log-dir",
default=os.getcwd(),
type=str,
help="Path to use for log files")
parser.add_argument("--mitigation-timeout",
default=15 * 60,
type=float,
help="Failure mitigation timeout (sec).")
parser.add_argument("--mttf",
default=150,
type=float,
help="Mean time to failure (sec).")
parser.add_argument("--ocs-namespace",
default="openshift-storage",
type=str,
help="Namespace where the OCS components are running")
parser.add_argument("--monitor-deployment",
action="append",
type=str,
help="namespace/name of a deployment's health to "
"monitor as part of steady-state hypothesis")
cli_args = parser.parse_args()
assert (cli_args.additional_failure >= 0 and cli_args.additional_failure < 1), \
"Additional failure probability must be in the range [0,1)"
assert cli_args.mttf > 0, "mttf must be greater than 0"
assert cli_args.mitigation_timeout > 0, "mitigation timeout must be greater than 0"
assert cli_args.check_interval > 0, "steady-state check interval must be greater than 0"
global STEADY_STATE_DEPLOYMENTS # pylint: disable=global-statement
if cli_args.monitor_deployment is not None:
for deploy in cli_args.monitor_deployment:
ns_name = deploy.split("/")
assert len(ns_name) == 2, "--monitor-deployment must be in namespace/name format"
STEADY_STATE_DEPLOYMENTS = cli_args.monitor_deployment
log_dir = os.path.join(cli_args.log_dir, f'ocs-monkey-chaos-{RUN_ID}')
util.setup_logging(log_dir)
logging.info("starting execution-- run id: %d", RUN_ID)
logging.info("program arguments: %s", cli_args)
logging.info("log directory: %s", log_dir)
logging.info("monitoring health of %d Deployments", len(STEADY_STATE_DEPLOYMENTS))
cephcluster = failure_ocs.CephCluster(cli_args.ocs_namespace,
cli_args.cephcluster_name)
# Assemble list of potential FailureTypes to induce
failure_types: List[failure.FailureType] = [
# CSI driver component pods
failure_ocs.DeletePodType(namespace=cli_args.ocs_namespace,
labels={"app": "csi-rbdplugin"},
cluster=cephcluster),
failure_ocs.DeletePodType(namespace=cli_args.ocs_namespace,
labels={"app": "csi-rbdplugin-provisioner"},
cluster=cephcluster),
# ceph component pods
failure_ocs.DeletePodType(namespace=cli_args.ocs_namespace,
labels={"app": "rook-ceph-mon"},
cluster=cephcluster),
failure_ocs.DeletePodType(namespace=cli_args.ocs_namespace,
labels={"app": "rook-ceph-osd"},
cluster=cephcluster),
# operator component pods
failure_ocs.DeletePodType(namespace=cli_args.ocs_namespace,
labels={"app": "rook-ceph-operator"},
cluster=cephcluster),
failure_ocs.DeletePodType(namespace=cli_args.ocs_namespace,
labels={"name": "ocs-operator"},
cluster=cephcluster),
]
# A list of the outstanding failures that we (may) need to repair. New
# failures are appended, and repairs are done from the end as well (i.e.,
# it's a stack).
pending_repairs: List[failure.Failure] = []
while True:
fail_instance: Optional[failure.Failure] = None
try:
fail_instance = get_failure(failure_types)
logging.info("invoking failure: %s", fail_instance)
fail_instance.invoke()
pending_repairs.append(fail_instance)
except failure.NoSafeFailures:
pass
if random.random() > cli_args.additional_failure or not fail_instance:
# don't cause more simultaneous failures
if fail_instance:
# This shouldn't be an assert... but what should we do?
assert await_mitigation(fail_instance, cli_args.mitigation_timeout)
verify_steady_state()
# Repair the infrastructure from all the failures, starting w/ most
# recent and working back.
logging.info("making repairs")
pending_repairs.reverse()
for repair in pending_repairs:
repair.repair()
pending_repairs.clear()
verify_steady_state()
# After all repairs have been made, ceph should become healthy
logging.info("waiting for ceph cluster to be healthy")
assert cephcluster.is_healthy(cli_args.mitigation_timeout)
# Wait until it's time for next failure, monitoring steady-state
# periodically
await_next_failure(cli_args.mttf, cli_args.check_interval)
if __name__ == '__main__':
main()