-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathintegration_test.py
206 lines (162 loc) · 8.51 KB
/
integration_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import cjson
import os
import subprocess
import tempfile
import unittest
from select import select
HERE = os.path.dirname(os.path.abspath(__file__))
debug = False
class TestGlowIntegration(unittest.TestCase):
def setUp(self):
for tube in tubes():
drain(tube)
self.listener = Listener()
def tearDown(self):
self.listener.kill()
def test_listener_runs_job(self):
tmpfilename = temporary_file_name()
self.listener.start()
subprocess.check_call([glow_executable(), '-tube', 'listener_runs_job', '-stdout', tmpfilename, '/bin/echo', 'listener_runs_job'])
self.listener.wait_for_job_completion({'tube': 'listener_runs_job', 'stdout': tmpfilename})
with open(tmpfilename, 'r') as outfile:
self.assertEqual('listener_runs_job\n', outfile.read())
self.listener.interrupt()
def test_local_runs_job(self):
tmpfilename = temporary_file_name()
subprocess.check_call([glow_executable(), '-local', '-stdout', tmpfilename, '/bin/echo', 'local_runs_job'])
with open(tmpfilename, 'r') as outfile:
self.assertEqual('local_runs_job\n', outfile.read())
def test_submit_many_jobs(self):
tmpfilename1 = temporary_file_name()
tmpfilename2 = temporary_file_name()
self.listener.start()
glow = subprocess.Popen([glow_executable()], stdin=subprocess.PIPE)
print >>glow.stdin, cjson.encode([
{'cmd': 'echo', 'args': ['submit_many_jobs'], 'tube': 'submit_many_jobs', 'stdout': tmpfilename1 },
{'cmd': 'echo', 'args': ['submit_many_jobs'], 'tube': 'submit_many_jobs', 'stdout': tmpfilename2 }
])
glow.stdin.close()
self.listener.wait_for_job_completion({'tube': 'submit_many_jobs', 'stdout': tmpfilename1})
self.listener.wait_for_job_completion({'tube': 'submit_many_jobs', 'stdout': tmpfilename2})
with open(tmpfilename1, 'r') as outfile:
self.assertEqual('submit_many_jobs\n', outfile.read())
with open(tmpfilename2, 'r') as outfile:
self.assertEqual('submit_many_jobs\n', outfile.read())
self.listener.interrupt()
def test_listener_finishes_job_on_interrupt(self):
tmpfilename = temporary_file_name()
self.listener.start()
subprocess.check_call([glow_executable(), '-tube', 'listener_finishes_job_on_interrupt', '-stdout', tmpfilename, '%s/sleepthenecho' % HERE, '3', 'listener_finishes_job_on_interrupt'])
self.listener.wait_for_job_start({'tube': 'listener_finishes_job_on_interrupt', 'stdout': tmpfilename})
self.listener.interrupt()
self.listener.wait_for_job_completion({'tube': 'listener_finishes_job_on_interrupt', 'stdout': tmpfilename}, seconds=10)
with open(tmpfilename, 'r') as outfile:
self.assertEqual('listener_finishes_job_on_interrupt\n', outfile.read())
def test_listener_kills_job_on_kill(self):
tmpfilename = temporary_file_name()
self.listener.start()
subprocess.check_call([glow_executable(), '-tube', 'listener_kills_job_on_kill', '-stdout', tmpfilename, '%s/sleepthenecho' % HERE, '5', 'listener_kills_job_on_kill'])
self.listener.wait_for_job_start({'tube': 'listener_kills_job_on_kill', 'stdout': tmpfilename})
self.listener.kill()
self.listener.wait_for_shutdown()
try:
with open(tmpfilename, 'r') as outfile:
self.assertNotEqual('listener_kills_job_on_kill\n', outfile.read())
except IOError as e:
# ignore if file was never created
if e.errno != 2: raise
def test_unexecable_job_fails_with_error(self):
self.assertFalse('GLOW_ERRORS' in tubes())
self.listener.start()
subprocess.check_call([glow_executable(), '-tube', 'unexecable_job_fails_with_error', '/nonexistent/executable'])
self.listener.wait_for_job_failure({'tube': 'unexecable_job_fails_with_error', 'cmd': '/nonexistent/executable'})
self.assertEqual(1, tubes()['GLOW_ERRORS']['jobs-ready'])
def test_nonzero_exitstatus_fails_with_error(self):
self.assertFalse('GLOW_ERRORS' in tubes())
self.listener.start()
subprocess.check_call([glow_executable(), '-tube', 'nonzero_exitstatus_fails_with_error', 'cat', '/nonexistent/file'])
self.listener.wait_for_job_failure({'tube': 'nonzero_exitstatus_fails_with_error', 'cmd': 'cat',
'args': ['/nonexistent/file']})
self.assertEqual(1, tubes()['GLOW_ERRORS']['jobs-ready'])
def test_local_job_failure(self):
self.assertFalse('GLOW_ERRORS' in tubes())
returncode = subprocess.call([glow_executable(), '-local', 'cat', '/nonexistent/file'], stderr=open('/dev/null', 'w'))
self.assertNotEqual(0, returncode)
self.assertEqual(1, tubes()['GLOW_ERRORS']['jobs-ready'])
def test_unexecable_local_job_failure(self):
self.assertFalse('GLOW_ERRORS' in tubes())
returncode = subprocess.call([glow_executable(), '-local', '/nonexistent/executable'], stderr=open('/dev/null', 'w'))
self.assertNotEqual(0, returncode)
self.assertEqual(1, tubes()['GLOW_ERRORS']['jobs-ready'])
def test_create_output_file_if_not_exists(self):
tmpfilename = temporary_file_name()
self.assertFalse(os.path.exists(tmpfilename))
self.listener.start()
subprocess.check_call([glow_executable(), '-tube', 'job', '-stdout', tmpfilename, '/bin/echo', 'job'])
self.listener.wait_for_job_completion({'tube': 'job', 'stdout': tmpfilename})
with open(tmpfilename, 'r') as outfile:
self.assertEqual('job\n', outfile.read())
self.listener.interrupt()
def test_append_to_output_file_if_exists(self):
tmpfilename = temporary_file_name()
self.listener.start()
subprocess.check_call([glow_executable(), '-tube', 'job1', '-stdout', tmpfilename, '/bin/echo', 'job1'])
subprocess.check_call([glow_executable(), '-tube', 'job2', '-stdout', tmpfilename, '/bin/echo', 'job2'])
self.listener.wait_for_job_completion({'tube': 'job2', 'stdout': tmpfilename})
with open(tmpfilename, 'r') as outfile:
self.assertEqual('job1\njob2\n', outfile.read())
self.listener.interrupt()
class Listener:
def __init__(self):
self.process = None
def start(self):
self.process = subprocess.Popen([glow_executable(), '-listen', '-v'], stderr=subprocess.PIPE)
def interrupt(self):
# Send SIGINT
self.process.send_signal(2)
def kill(self):
if self.process:
try:
self.process.terminate()
except OSError as e:
# ignore if 'No such process' (already killed)
if e.errno != 3:
raise
def wait_for_shutdown(self):
self.process.wait()
def wait_for_job_start(self, job_desc, seconds=3):
self._wait_for_job_update(job_desc, 'RUNNING:', seconds)
def wait_for_job_completion(self, job_desc, seconds=3):
self._wait_for_job_update(job_desc, 'COMPLETE:', seconds)
def wait_for_job_failure(self, job_desc, seconds=3):
self._wait_for_job_update(job_desc, 'FAILED:', seconds)
def _wait_for_job_update(self, job_desc, status, seconds, max_num_non_matching_events=10):
num_events = 0
while num_events < max_num_non_matching_events:
fds, _, _ = select([self.process.stderr], [], [], seconds)
if fds != [self.process.stderr]:
raise Exception('timed out waiting for {0} {1}'.format(status, job_desc))
line = self.process.stderr.readline().split(' ')[2:] # get rid of log date/time
line = " ".join(line)
if debug: print line
if line.startswith(status):
job = cjson.decode(line[len(status):])
if all([job[k] == job_desc[k] for k in job_desc]):
return job
num_events += 1
def temporary_file_name():
if debug:
_, tmpfilename = tempfile.mkstemp()
print 'temporary file:', tmpfilename
return tmpfilename
else:
return tempfile.NamedTemporaryFile().name
def glow_executable():
return '%s/glow' % HERE
def tubes():
return cjson.decode(subprocess.check_output([glow_executable(), '-stats']))
def drain(tube):
subprocess.check_call([glow_executable(), '-drain', tube])
if __name__ == '__main__':
# this works, but nose is better
unittest.main()