Skip to content

Commit 52b1215

Browse files
feat(api): initial version of check-delegation management command
1 parent 0cc1b4d commit 52b1215

File tree

1 file changed

+138
-0
lines changed

1 file changed

+138
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
from functools import cache
2+
from socket import getaddrinfo
3+
4+
from django.conf import settings
5+
from django.core.management import BaseCommand
6+
import dns.exception, dns.message, dns.name, dns.query, dns.resolver
7+
8+
from desecapi.models import Domain
9+
10+
11+
LPS = {dns.name.from_text(lps) for lps in settings.LOCAL_PUBLIC_SUFFIXES}
12+
SERVER = "8.8.8.8"
13+
14+
15+
@cache
16+
def lookup(target):
17+
try:
18+
addrinfo = getaddrinfo(str(target), None)
19+
except OSError:
20+
addrinfo = []
21+
return {v[-1][0] for v in addrinfo}
22+
23+
24+
class Command(BaseCommand):
25+
help = "Check delegation status."
26+
27+
def __init__(self, *args, **kwargs):
28+
self.our_ns_set = {dns.name.from_text(ns) for ns in settings.DEFAULT_NS}
29+
self.our_ip_set = set.union(*(lookup(ns) for ns in self.our_ns_set))
30+
self.resolver = dns.resolver.Resolver()
31+
super().__init__(*args, **kwargs)
32+
33+
def add_arguments(self, parser):
34+
parser.add_argument(
35+
"domain-name",
36+
nargs="*",
37+
help="Domain name to check. If omitted, will check all domains not registered under a local public suffix.",
38+
)
39+
40+
def handle_domain(self, domain):
41+
# Identify parent
42+
domain_name = dns.name.from_text(domain.name)
43+
parent = domain_name.parent()
44+
while len(parent):
45+
query = dns.message.make_query(parent, dns.rdatatype.NS)
46+
try:
47+
res = dns.query.udp(query, SERVER, timeout=5)
48+
except:
49+
res = dns.query.tcp(query, SERVER, timeout=5)
50+
if res.answer:
51+
break
52+
parent = parent.parent()
53+
54+
# Find delegation NS hostnames and IP addresses
55+
try:
56+
ns = res.find_rrset(res.answer, parent, dns.rdataclass.IN, dns.rdatatype.NS)
57+
except KeyError:
58+
raise dns.resolver.NoNameservers
59+
ipv4 = set()
60+
ipv6 = set()
61+
for rr in ns:
62+
ipv4 |= {ip for ip in lookup(rr.target) if "." in ip}
63+
ipv6 |= {ip for ip in lookup(rr.target) if "." not in ip}
64+
65+
self.resolver.nameserver = list(ipv4) + list(ipv6)
66+
try:
67+
answer = dns.resolver.resolve(domain_name, dns.rdatatype.NS)
68+
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN):
69+
domain.is_registered = False
70+
return
71+
domain.is_registered = True
72+
73+
# Compute overlap of delegation NS hostnames and IP addresses with ours
74+
ns_intersection = self.our_ns_set & {name.target for name in answer}
75+
domain.has_all_nameservers = ns_intersection == self.our_ns_set
76+
77+
ns_ip_intersection = self.our_ip_set & set.union(
78+
*(lookup(rr.target) for rr in answer)
79+
)
80+
# .is_delegated: None means "not delegated to deSEC", False means "partial", True means "fully"
81+
if not ns_ip_intersection:
82+
domain.is_delegated = None
83+
else:
84+
domain.is_delegated = ns_ip_intersection == self.our_ip_set
85+
86+
# Find delegation DS records
87+
if ns_ip_intersection:
88+
query = dns.message.make_query(domain_name, dns.rdatatype.DS)
89+
try:
90+
res = dns.query.udp(query, "8.8.8.8", timeout=5)
91+
except:
92+
res = dns.query.tcp(query, "8.8.8.8", timeout=5)
93+
try:
94+
ds = res.find_rrset(
95+
res.answer, domain_name, dns.rdataclass.IN, dns.rdatatype.DS
96+
)
97+
except KeyError:
98+
ds = set()
99+
ds = {rr.to_text() for rr in ds}
100+
101+
# Compute overlap of delegation DS records with ours
102+
our_ds_set = set()
103+
for key in domain.keys:
104+
# Only digest type 2 is mandatory to implement; delegation only fully set up if present
105+
our_ds_set |= {ds for ds in key["ds"] if ds.split(" ")[2] == "2"}
106+
ds_intersection = our_ds_set & ds
107+
# .is_secured: None means "not secured with deSEC", False means "partial", True means "fully"
108+
if not ds_intersection:
109+
domain.is_secured = None
110+
else:
111+
domain.is_secured = ds_intersection == our_ds_set
112+
113+
def handle(self, *args, **options):
114+
qs = Domain.objects
115+
if options["domain-name"]:
116+
qs = qs.filter(
117+
name__in=[name.rstrip(".") for name in options["domain-name"]]
118+
)
119+
for domain in qs.all():
120+
if domain.is_locally_registrable:
121+
continue
122+
123+
try:
124+
self.handle_domain(domain)
125+
except dns.resolver.LifetimeTimeout:
126+
print(f"{domain.name} Timeout")
127+
continue
128+
except dns.resolver.NoNameservers:
129+
print(f"{domain.name} Unresponsive")
130+
continue
131+
if domain.is_registered and domain.is_delegated is not None:
132+
print(
133+
f"{domain.owner.email} {domain.name} {domain.has_all_nameservers=} {domain.is_secured=}"
134+
)
135+
else:
136+
print(
137+
f"{domain.owner.email} {domain.name} {domain.is_registered=} delegated=False"
138+
)

0 commit comments

Comments
 (0)