This repository was archived by the owner on May 22, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwebserver.py
150 lines (117 loc) · 4.32 KB
/
webserver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import base64
import binascii
import http.server
import ipaddress
import signal
import sys
import threading
import traceback
from urllib.parse import urlparse, parse_qs
import dnsupdater
VERSION = "0.1"
ACCEPTED_AUTH = "Basic"
def main():
if len(sys.argv) < 2:
print("No port given for the webserver, using 8080", file=sys.stderr)
port = 8080
else:
try:
port = int(sys.argv[1])
except Exception as e:
raise RuntimeError("Something was wrong with the port: " + sys.argv[1]) from e
httpd = http.server.HTTPServer(("", port), RequestHandler)
print("Serving at port", port)
main_thread = threading.get_ident()
def sigterm_handler(signum, frame):
print("Shutting down...")
signal.pthread_kill(main_thread, signal.SIGINT)
signal.signal(signal.SIGTERM, sigterm_handler)
try:
httpd.serve_forever()
except KeyboardInterrupt:
httpd.server_close()
finally:
print("Server shut down")
class RequestHandler(http.server.BaseHTTPRequestHandler):
def __init__(self, request, client_address, server):
super().__init__(request, client_address, server)
def version_string(self):
return "RS485-DDNS/" + VERSION + " " + sys.version
def _send_head(self):
auth_header = self.headers['Authorization']
if auth_header is None:
self._send_unauthorized()
return
if not auth_header.startswith(ACCEPTED_AUTH + " "):
self._send_unauthorized("Unknown authorization method")
return
encoded_auth = auth_header[len(ACCEPTED_AUTH) + 1:]
try:
auth_string = base64.standard_b64decode(encoded_auth).decode()
except (binascii.Error, ValueError, UnicodeDecodeError):
self.send_error(400, "Error when decoding authentication string")
return
splitter = auth_string.find(':')
if splitter == -1:
self.send_error(400, "Error when decoding authentication string")
return
username = auth_string[:splitter]
password = auth_string[splitter + 1:]
if username == "user" and password == "pass":
self._parse_path()
else:
self._send_unauthorized()
def _send_unauthorized(self, message=None):
self.send_response(401, message)
self.send_header('WWW-Authenticate', ACCEPTED_AUTH)
self.end_headers()
# no body
def _parse_path(self):
parsedurl = urlparse(self.path)
if parsedurl.path != "/update":
self.send_error(404)
return
try:
query_dict = parse_qs(parsedurl.query)
except Exception:
self.send_error(400, "Could not parse query")
return
if 'domain' not in query_dict:
self.send_error(400, "No domain given")
return
if 'ipaddr' not in query_dict and 'ip6addr' not in query_dict:
self.send_error(400, "No ip address given")
return
ip4 = query_dict.get('ipaddr', (None,))[0]
ip6 = query_dict.get('ip6addr', (None,))[0]
if ip4 is not None:
try:
ip4 = ipaddress.IPv4Address(ip4)
except ipaddress.AddressValueError:
self.send_error(400, "IPv4 address invalid")
if ip6 is not None:
try:
ip6 = ipaddress.IPv6Address(ip6)
except ipaddress.AddressValueError:
self.send_error(400, "IPv6 address invalid")
try:
updater.set_record_for_domain(query_dict['domain'][0], ip4, ip6)
except dnsupdater.UnknownDomainError:
self.send_error(404, "Given domain not found")
return
except dnsupdater.UnconfigurableDomainError:
self.send_error(409, "Given domain is permanently unconfigurable")
return
except dnsupdater.TemporarilyUnconfigurableError:
traceback.print_exc(file=sys.stderr)
self.send_error(503, "Given domain is temporarily unconfigurable")
return
self.send_response(200, "DNS updated")
self.end_headers()
def do_GET(self):
self._send_head()
def do_HEAD(self):
self._send_head()
if __name__ == "__main__":
updater = dnsupdater.BindUpdater()
main()