forked from compose/governor
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgovernor.py
executable file
·87 lines (75 loc) · 2.64 KB
/
governor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#!/usr/bin/env python
import sys, os, yaml, time, urllib2, atexit
import logging
from helpers.etcd import Etcd
from helpers.postgresql import Postgresql
from helpers.ha import Ha
logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', level=logging.INFO)
f = open(sys.argv[1], "r")
config = yaml.load(f.read())
f.close()
etcd = Etcd(config["etcd"])
postgresql = Postgresql(config["postgresql"])
ha = Ha(postgresql, etcd)
# wait for etcd to be available
etcd_ready = False
while not etcd_ready:
try:
etcd.touch_member(postgresql.name, postgresql.connection_string)
etcd_ready = True
except urllib2.URLError:
logging.info("waiting on etcd")
time.sleep(5)
# is data directory empty?
if postgresql.data_directory_empty():
# racing to initialize
if etcd.race("/initialize", postgresql.name):
postgresql.initialize()
etcd.take_leader(postgresql.name)
postgresql.start()
else:
synced_from_leader = False
while not synced_from_leader:
leader = etcd.current_leader()
if not leader:
time.sleep(5)
continue
if postgresql.sync_from_leader(leader):
postgresql.write_recovery_conf(leader)
# Give the leader time to create the replication slot
time.sleep(config["loop_wait"] * 2)
postgresql.start()
synced_from_leader = True
else:
time.sleep(5)
else:
leader = etcd.current_leader()
if leader is not None:
if leader['hostname'] == postgresql.name:
# still a leader
postgresql.start()
else:
postgresql.follow_the_leader(leader)
else:
postgresql.follow_no_leader()
postgresql.start()
while True:
etcd.touch_member(postgresql.name, postgresql.connection_string)
logging.info(ha.run_cycle())
# create replication slots
if postgresql.is_leader():
for member in etcd.members():
if member['hostname'] != postgresql.name:
postgresql.query("""
DO LANGUAGE plpgsql $$
DECLARE somevar VARCHAR;
BEGIN
SELECT slot_name INTO somevar
FROM pg_replication_slots
WHERE slot_name = '%(slot)s'
LIMIT 1;
IF NOT FOUND THEN
PERFORM pg_create_physical_replication_slot('%(slot)s');
END IF;
END$$;""" % {"slot": member['hostname']})
time.sleep(config["loop_wait"])