-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathhandle_k8s.py
161 lines (136 loc) · 5.08 KB
/
handle_k8s.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import logging
import pk_config
import time
import pykube
dryrun_id = "k8s"
MASTER = "node-role.kubernetes.io/master"
kube = pykube.HTTPClient(pykube.KubeConfig.from_file("/root/.kube/config"))
def query_list_of_nodes(endpoint, worker_name="micado-worker", status="ready"):
log = logging.getLogger("pk_k8s")
list_of_nodes = []
if pk_config.dryrun_get(dryrun_id):
log.info("(I) DRYRUN enabled. Skipping...")
a = {}
a["ID"] = "dummyID"
a["Addr"] = "127.0.0.1"
list_of_nodes.append(a.copy())
return list_of_nodes
try:
if status == "ready":
query = pykube.Node.objects(kube).filter(
selector={"micado.eu/node_type__in": {worker_name}}
)
nodes = [x for x in query if "taints" not in x.obj["spec"]]
elif status == "down":
nodes = []
worker_nodes = [
x for x in pykube.Node.objects(kube) if MASTER not in x.labels
]
for node in worker_nodes:
ready_condition = [
x.items()
for x in node.obj["status"]["conditions"]
if x.get("type") == "Ready"
][0]
if ("status", "Unknown") in ready_condition:
nodes.append(node)
for n in nodes:
a = {}
n.reload()
a["ID"] = n.metadata["name"]
a["Addr"] = n.obj["status"]["addresses"][0]["address"]
list_of_nodes.append(a.copy())
return list_of_nodes
except Exception:
log.exception("(Q) Query of k8s nodes failed.")
return dict()
def scale_k8s_deploy(endpoint, service_name, replicas):
service_name = "-".join(service_name.split("_")[1:])
log = logging.getLogger("pk_k8s")
log.info("(S) => m_container_count: {0}".format(replicas))
if pk_config.dryrun_get(dryrun_id):
log.info("(S) DRYRUN enabled. Skipping...")
return
try:
query = pykube.Deployment.objects(kube).filter(
field_selector={"metadata.name": service_name}
)
deployment = [x for x in query][0]
deployment.reload()
deployment.scale(replicas)
except Exception as e:
log.warning(
'(S) Scaling of k8s service "{0}" failed: {1}'.format(service_name, str(e))
)
return
def query_k8s_replicas(endpoint, service_name):
service_name = "-".join(service_name.split("_")[1:])
log = logging.getLogger("pk_k8s")
instance = 1
if pk_config.dryrun_get(dryrun_id):
log.info("(I) DRYRUN enabled. Skipping...")
return instance
try:
query = pykube.Deployment.objects(kube).filter(
field_selector={"metadata.name": service_name}
)
deployment = [x for x in query][0]
deployment.reload()
instance = deployment.replicas
log.debug(
"(I) => m_container_count for {0}: {1}".format(service_name, instance)
)
except Exception as e:
log.warning(
'(Q) Querying k8s service "{0}" replicas failed: {1}'.format(
service_name, str(e)
)
)
return instance
down_nodes_stored = {}
def remove_node(endpoint, id):
log = logging.getLogger("pk_k8s")
if pk_config.dryrun_get(dryrun_id):
log.info("(M) DRYRUN enabled. Skipping...")
return
try:
query = pykube.Node.objects(kube).filter(field_selector={"metadata.name": id})
node = [x for x in query][0]
node.reload()
node.delete()
except Exception:
log.error("(M) => Removing k8s node failed.")
return
def down_nodes_cleanup_by_list(stored, actual):
setStored = {v["ID"] for k, v in stored.items()}
setActual = {x["ID"] for x in actual}
missing = {x for x in setStored if x not in setActual}
for x in missing:
del stored[x]
def down_nodes_add_from_list(stored, actual):
for node in actual:
if "ID" in node and node["ID"] not in stored:
stored[node["ID"]] = node
stored[node["ID"]]["micado_timestamp"] = int(time.time())
def down_nodes_cleanup_by_timeout(endpoint, stored, timeout):
log = logging.getLogger("pk_k8s")
current_time = int(time.time())
for id, node in list(stored.items())[:]:
if node["micado_timestamp"] + timeout < current_time:
log.info(
"(M) => Node {0} is down for more than {1} seconds, removing.".format(
id, timeout
)
)
remove_node(endpoint, id)
del stored[id]
def down_nodes_maintenance(endpoint, down_nodes_timeout=120):
log = logging.getLogger("pk_k8s")
if pk_config.dryrun_get(dryrun_id):
log.info("(M) DRYRUN enabled. Skipping...")
return
down_nodes_actual = query_list_of_nodes(endpoint, status="down")
down_nodes_cleanup_by_list(down_nodes_stored, down_nodes_actual)
down_nodes_add_from_list(down_nodes_stored, down_nodes_actual)
down_nodes_cleanup_by_timeout(endpoint, down_nodes_stored, down_nodes_timeout)
return