-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest.py
147 lines (117 loc) · 3.52 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import subprocess
import sys
import signal
import time
import os.path
from contextlib import contextmanager
from subprocess import Popen, PIPE, STDOUT
from os import path
from time import sleep
@contextmanager
def timeout(time):
signal.signal(signal.SIGALRM, raise_timeout)
signal.alarm(time)
try:
yield
except TimeoutError:
pass
finally:
signal.signal(signal.SIGALRM, signal.SIG_IGN)
def raise_timeout(signum, frame):
raise TimeoutError
def make_target(target):
subprocess.run(["make " + target], shell=True)
return path.exists(target)
def make_clean():
subprocess.run(["make clean"], shell=True)
def exit_if_condition(condition, message):
if condition:
print(message)
make_clean()
quit()
class Process:
def __init__(self, command):
self.command = command
self.started = False
def start(self):
try:
self.proc = Popen(self.command, universal_newlines=True, stdin=PIPE, stdout=PIPE, stderr=PIPE)
self.started = True
except FileNotFoundError as e:
print(e)
quit()
def finish(self):
if self.started:
self.proc.terminate()
self.proc.wait(timeout=0.2)
def send_input(self, proc_in):
if self.started:
self.proc.stdin.write(proc_in + "\n")
self.proc.stdin.flush()
def get_output(self):
if self.started:
return self.proc.stdout.readline()
else:
return ""
def get_output_timeout(self, tout):
if self.started:
with timeout(tout):
try:
return self.proc.stdout.readline()
except TimeoutError as e:
return "timeout"
else:
return ""
def is_alive(self):
if self.started:
return self.proc.poll() is None
else:
return False
def tcp_test():
# clean up
make_clean()
# run make
exit_if_condition(not make_target("client"), "Client could not be built")
exit_if_condition(not make_target("server"), "Server could not be built")
# create the process objects
server = Process(["./server"])
client = Process(["./client"])
# start the server and check that it is up
server.start()
sleep(1)
exit_if_condition(not server.is_alive(), "Server is not up")
# check that the server prints the initial message
outs = server.get_output_timeout(3)
exit_if_condition(outs == "timeout", "Server did not print in time")
# start the client and check that it is up
client.start()
sleep(1)
exit_if_condition(not client.is_alive(), "Client is not up")
# check that the client prints the initial message
outc = client.get_output_timeout(3)
exit_if_condition(outc == "timeout", "Client did not print in time")
# send a newline in the client
client.send_input("")
# check that the server receives the connection from the client
outs = server.get_output_timeout(3)
exit_if_condition(outs == "timeout", "Server did not receive the connection")
print(outs)
# check that the client prints the second message
outc = client.get_output_timeout(3)
exit_if_condition(outc == "timeout", "Client did not print in time")
# send a newline in the client
client.send_input("")
# check that the server receives the data from the client
outs = server.get_output_timeout(3)
exit_if_condition(outs == "timeout", "Server did not receive the data")
print(outs)
# check that the client and the server have exited
sleep(2)
exit_if_condition(client.is_alive(), "Client is still up")
exit_if_condition(server.is_alive(), "Server is still up")
# stop the processes
server.finish()
client.finish()
# clean up
make_clean()
tcp_test()