-
Notifications
You must be signed in to change notification settings - Fork 1
/
communication.py
180 lines (157 loc) · 6.55 KB
/
communication.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import json
import select
import socket
import time
from threading import Thread
import os
from config import Config
from connection import Connection
from server import Server
from utils.send_packet import send_packet
hello_template = {"type": "hello", "myname": Config().NAME}
aleykumselam_template = {"type": "aleykumselam", "myname": Config().NAME}
def udp_broadcast():
for i in range(5):
try:
print("Starting up, broadcasting hello message!")
message = json.dumps(hello_template)
broadcast_ip = ".".join(Config().MY_IP.split(".")[0:3]) + ".255"
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(("", 0))
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.sendto(message.encode("utf-8"), (broadcast_ip, Config().CONTROL_PORT))
except Exception as e:
print("Udp broadcast failed!", e)
def udp_listen():
print("Listening for incoming udp messages")
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind(("", Config().CONTROL_PORT))
s.setblocking(0)
while True:
try:
result = select.select([s], [], [])
msg, addr = result[0][0].recvfrom(1024)
sender_ip = addr[0]
if not msg or sender_ip == Config().MY_IP:
continue
message = msg.strip().decode("utf-8")
message = json.loads(message)
if message["type"] != "hello":
continue
print(f"{message['myname']} says hello!")
# send aleykumselam message
aleykumselam_message = json.dumps(aleykumselam_template)
aleykumselam_message = aleykumselam_message.encode("utf-8")
send_packet(sender_ip, Config().CONTROL_PORT, aleykumselam_message)
# add to addresses
Connection().connected_ips[sender_ip] = message["myname"]
print("Current addresses: ", Connection().connected_ips)
except Exception as e:
print("What the hack is this, udp?", e)
def tcp_listen():
print("Listening for incoming tcp messages")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((Config().MY_IP, Config().CONTROL_PORT))
s.listen()
while True:
try:
conn, addr = s.accept()
with conn:
line = conn.recv(1024)
sender_ip = addr[0]
if not line:
continue
message = line.strip().decode("utf-8")
if message[0] == '_': # send a response as soon as possibl
conn.sendall("_".encode())
continue
if message[0] == '#': # client has send a probe data
if Config().NAME != 'server':
print('Go away!')
continue
client_name = Connection().connected_ips[sender_ip]
_, receiver_name, delay = message.split('#')
Server().syncManager.update(client_name, receiver_name, delay)
continue
try:
message = json.loads(message)
except:
print("Invalid message!")
continue
if message["type"] == "aleykumselam":
print(f"{message['myname']} says aleykumselam")
# l = conn.recv(1024)
# save the ip address in a dictionary
Connection().connected_ips[sender_ip] = message["myname"]
else:
print("Invalid message type!")
except Exception as e:
print("What the hack is this, tcp?", str(e))
continue
def send_probe(ip_of_client):
s = 0
probe_count = 3
for i in range(probe_count):
first_time = time.perf_counter()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((ip_of_client, Config().CONTROL_PORT))
sock.sendall("_".encode()) # message does not matter
_ = sock.recv(1024)
second_time = time.perf_counter()
s += second_time - first_time
time.sleep(0.1)
return s/probe_count
def sync_delay():
c = 0
while True:
c+=1
try:
for client_name in Connection().connected_ips.inv.keys():
if client_name == 'server': continue # everybody sends everbody except no one probes the server
ip_of_client = Connection().connected_ips.inv[client_name]
avg = send_probe(ip_of_client)
me_to_client_delay = (avg)*1000/2
# print(f"{client_name=} {me_to_client_delay=} in ms")
# send to server if necessary
if Config().NAME == 'server':
Server().syncManager.update('server', client_name, me_to_client_delay)
else:
send_to_server(client_name, me_to_client_delay)
except Exception as e:
print('sync_delay error', e)
if c % 5 == 0 and Config().NAME == 'server':
Server().syncManager.solve()
time.sleep(1)
def send_to_server(other, delay):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if 'server' not in Connection().connected_ips.inv.keys():
raise Exception('Server is not connected!')
sock.connect((Connection().connected_ips.inv['server'], Config().CONTROL_PORT))
sock.sendall(f"#{other}#{delay}".encode()) # message does not matter
def udp_broadcast_interval():
while True:
udp_broadcast()
return
time.sleep(60)
def cleaner():
while True:
t = time.time()
files = os.listdir('temp')
for file in files:
if file[-4:] != Config().FILE_EXTENSION: continue
file_time = file.split(Config().FILE_EXTENSION)[0]
if t - float(file_time) > 60:
os.remove(f'temp/{file}')
time.sleep(60) # every minute
def startup():
# Send hello message to all ips on the LAN
tcp_thread = Thread(target=tcp_listen)
tcp_thread.start()
udp_thread = Thread(target=udp_listen)
udp_thread.start()
udp_broadcast_thread = Thread(target=udp_broadcast_interval)
udp_broadcast_thread.start()
sync_delay_thread = Thread(target=sync_delay)
sync_delay_thread.start()
cleaner_thread = Thread(target=cleaner)
cleaner_thread.start()