diff --git a/README.md b/README.md index 132ea1c9b..79711323b 100644 --- a/README.md +++ b/README.md @@ -334,6 +334,7 @@ While there are certainly many ways to get started hacking desec-stack, here is source venv/bin/activate pip install wheel pip install -r requirements.txt + pip install -r requirements-test.txt 1. At this point, Django is ready to run in the virtual environment created above. There are two things to consider when running Django outside the container. @@ -359,10 +360,9 @@ While there are certainly many ways to get started hacking desec-stack, here is docker compose -f docker-compose.yml -f docker-compose.test-api.yml up -d dbapi - Finally, you can manage Django using the `manage.py` CLI. - As an example, to run the tests, use + To run the tests, in `/api` use - python3 manage.py test + pytest 1. Open the project root directory `desec-stack` in PyCharm and select File › Settings. 1. In Project: desec-stack › Project Structure, mark the `api/` folder as a source folder. diff --git a/api/Dockerfile b/api/Dockerfile index 420210746..cd49146f2 100644 --- a/api/Dockerfile +++ b/api/Dockerfile @@ -11,11 +11,12 @@ WORKDIR /usr/src/app ENV PIP_DISABLE_PIP_VERSION_CHECK=1 ENV PIP_NO_CACHE_DIR=1 -COPY requirements.txt /usr/src/app/ +COPY requirements*.txt /usr/src/app/ # freetype-dev is needed for captcha generation RUN apk add --no-cache gcc freetype-dev libffi-dev musl-dev libmemcached-dev postgresql-dev jpeg-dev zlib-dev RUN pip install --upgrade pip \ && pip install -r requirements.txt \ + && pip install -r requirements-test.txt \ && pip freeze RUN mkdir /root/cronhook diff --git a/api/api/settings.py b/api/api/settings.py index a16896a2d..825251475 100644 --- a/api/api/settings.py +++ b/api/api/settings.py @@ -160,6 +160,14 @@ ] DESECSTACK_DOMAIN = os.environ["DESECSTACK_DOMAIN"] +RESOLVERS = [ + "9.9.9.9", + "2620:fe::fe", # Quad9 + "1.1.1.1", + "2606:4700:4700::1111", # Cloudflare + "8.8.8.8", + "2001:4860:4860::8888", # Google +] # default NS records DEFAULT_NS = [name + "." for name in os.environ["DESECSTACK_NS"].strip().split()] diff --git a/api/desecapi/migrations/0045_domain_delegation_status_and_more.py b/api/desecapi/migrations/0045_domain_delegation_status_and_more.py new file mode 100644 index 000000000..01855829b --- /dev/null +++ b/api/desecapi/migrations/0045_domain_delegation_status_and_more.py @@ -0,0 +1,72 @@ +# Generated by Django 5.1.8 on 2025-04-12 15:02 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("desecapi", "0044_alter_captcha_created_alter_domain_renewal_state_and_more"), + ] + + operations = [ + migrations.AddField( + model_name="domain", + name="delegation_status", + field=models.IntegerField( + blank=True, + choices=[ + (0, "Not Delegated"), + (1, "Elsewhere"), + (2, "Partial"), + (3, "Exclusive"), + (4, "Multi"), + (128, "Error Nxdomain"), + (129, "Error No Answer"), + (130, "Error No Nameservers"), + (131, "Error Timeout"), + ], + default=None, + null=True, + ), + ), + migrations.AddField( + model_name="domain", + name="delegation_status_changed", + field=models.DateTimeField(blank=True, default=None, null=True), + ), + migrations.AddField( + model_name="domain", + name="delegation_status_touched", + field=models.DateTimeField(blank=True, default=None, null=True), + ), + migrations.AddField( + model_name="domain", + name="security_status", + field=models.IntegerField( + blank=True, + choices=[ + (0, "Insecure"), + (1, "Foreign Keys"), + (2, "Secure Exclusive"), + (3, "Secure"), + (128, "Error Nxdomain"), + (129, "Error No Answer"), + (130, "Error No Nameservers"), + (131, "Error Timeout"), + ], + default=None, + null=True, + ), + ), + migrations.AddField( + model_name="domain", + name="security_status_changed", + field=models.DateTimeField(blank=True, default=None, null=True), + ), + migrations.AddField( + model_name="domain", + name="security_status_touched", + field=models.DateTimeField(blank=True, default=None, null=True), + ), + ] diff --git a/api/desecapi/models/domains.py b/api/desecapi/models/domains.py index cd4f5c488..30862e5aa 100644 --- a/api/desecapi/models/domains.py +++ b/api/desecapi/models/domains.py @@ -1,8 +1,13 @@ from __future__ import annotations -from functools import cached_property - -import dns +from functools import cache, cached_property +from socket import getaddrinfo + +import dns.name +import dns.rdataclass +import dns.rdatatype +import dns.rdtypes +import dns.resolver import psl_dns from django.conf import settings from django.contrib.auth.models import AnonymousUser @@ -10,6 +15,7 @@ from django.db import models from django.db.models import CharField, F, Manager, Q, Value from django.db.models.functions import Concat, Length +from django.utils import timezone from django_prometheus.models import ExportModelOperationsMixin from dns.exception import Timeout from dns.resolver import NoNameservers @@ -23,6 +29,13 @@ psl = psl_dns.PSL(resolver=settings.PSL_RESOLVER, timeout=0.5) +# CHECKING DISABLED general-purpose resolver for queries to the public DNS +resolver_CD = dns.resolver.Resolver(configure=False) +resolver_CD.nameservers = settings.RESOLVERS +resolver_CD.flags = ( + (resolver_CD.flags or 0) | dns.flags.CD | dns.flags.AD | dns.flags.RD +) + class DomainManager(Manager): def filter_qname(self, qname: str, **kwargs) -> models.query.QuerySet: @@ -52,6 +65,25 @@ class RenewalState(models.IntegerChoices): NOTIFIED = 2 WARNED = 3 + class DelegationStatus(models.IntegerChoices): + NOT_DELEGATED = 0 + ELSEWHERE = 1 + PARTIAL = 2 + EXCLUSIVE = 3 + MULTI = 4 + ERROR_NXDOMAIN = 128 + ERROR_NO_NAMESERVERS = 129 + ERROR_TIMEOUT = 130 + + class SecurityStatus(models.IntegerChoices): + INSECURE = 0 + FOREIGN_KEYS = 1 + SECURE_EXCLUSIVE = 2 + SECURE = 3 + ERROR_NXDOMAIN = 128 + ERROR_NO_NAMESERVERS = 130 + ERROR_TIMEOUT = 131 + created = models.DateTimeField(auto_now_add=True) name = models.CharField( max_length=191, unique=True, validators=validate_domain_name @@ -63,6 +95,26 @@ class RenewalState(models.IntegerChoices): choices=RenewalState.choices, db_index=True, default=RenewalState.IMMORTAL ) renewal_changed = models.DateTimeField(auto_now_add=True) + delegation_status = models.IntegerField( + choices=DelegationStatus.choices, + default=None, + null=True, + blank=True, + ) + delegation_status_touched = models.DateTimeField( + default=None, null=True, blank=True + ) + delegation_status_changed = models.DateTimeField( + default=None, null=True, blank=True + ) + security_status = models.IntegerField( + choices=SecurityStatus.choices, + default=None, + null=True, + blank=True, + ) + security_status_touched = models.DateTimeField(default=None, null=True, blank=True) + security_status_changed = models.DateTimeField(default=None, null=True, blank=True) _keys = None objects = DomainManager() @@ -177,6 +229,176 @@ def is_registrable(self): return True + @staticmethod + @cache # located at object-level to start with clear cache for new objects + def _lookup(target) -> set[str]: + try: + addrinfo = getaddrinfo(str(target), None) + except OSError: + return set() + return {v[-1][0] for v in addrinfo} + + def update_dns_delegation_status(self) -> DelegationStatus: + """ + Queries the DNS to determine the delegation status of this domian and + update the delegation status on record. + + The delegation status is evaluated by trying to determine the NS records + as defined in the parent zone to the this `Domain`. + + The parent zone's apex is determined by walking up the DNS tree, querying + NS records until an answer is provided. Once the apex and nameservers are + found, they are queried for the nameservers of `self.name`. + """ + old_delegation_status = self.delegation_status + + our_ns_addrs = { + rr.address + for name in settings.DEFAULT_NS + for rrtype in {"A", "AAAA"} + for rr in list(resolver_CD.resolve(name, rrtype)) + } + + try: + # Determine the parent zone nameservers + ## names + parent_apex_candidate = dns.name.from_text(self.name).parent() + parent_nameservers = [] + while len(parent_apex_candidate) > 1: + parent_nameservers = resolver_CD.resolve( + parent_apex_candidate, dns.rdatatype.NS, raise_on_no_answer=False + ) + if parent_nameservers: + break + else: + parent_apex_candidate = parent_apex_candidate.parent() + + # TODO what if no parnet_nameservers? what if len(parent_apex_candidate) == 0? + + ## addresses + parent_nameservers_addrs = [ + rr.address + for ns in parent_nameservers + for rrtype in {"A", "AAAA"} + for rr in list( + resolver_CD.resolve(ns.target, rrtype, raise_on_no_answer=False) + ) + ] # TODO duplicate with above + + # Determine this zone's nameservers as defined at parent zone nameservers + ## names + resolver = dns.resolver.Resolver(configure=False) + resolver.nameservers = parent_nameservers_addrs + resolver.flags = dns.flags.AD + auth_ns_names = { + rr.target + for rr in resolver.resolve( + self.name, dns.rdatatype.NS, raise_on_no_answer=False + ) + } # FIXME does not work because NS records are located in the AUTHORITY section instead of ANSWER section + + ## addresses + auth_ns_addrs = { + rr.address + for name in auth_ns_names + for rrtype in {"A", "AAAA"} + for rr in list( + resolver_CD.resolve(name, rrtype, raise_on_no_answer=False) + ) + } # TODO duplicate with above + except dns.resolver.NXDOMAIN: + self.delegation_status = self.DelegationStatus.ERROR_NXDOMAIN + except dns.resolver.NoNameservers: + self.delegation_status = self.DelegationStatus.ERROR_NO_NAMESERVERS + except dns.resolver.LifetimeTimeout: + self.delegation_status = self.DelegationStatus.ERROR_TIMEOUT + else: + + if our_ns_addrs == auth_ns_addrs: + # just ours + self.delegation_status = self.DelegationStatus.EXCLUSIVE + elif our_ns_addrs < auth_ns_addrs: + # all of ours, and others + self.delegation_status = self.DelegationStatus.MULTI + elif our_ns_addrs & auth_ns_addrs: + # intersection is non-empty, but not all of our's are included + # some of ours, and others + self.delegation_status = self.DelegationStatus.PARTIAL + elif auth_ns_addrs: + # none of ours, but not empty + self.delegation_status = self.DelegationStatus.ELSEWHERE + elif auth_ns_addrs == set(): + # empty + self.delegation_status = self.DelegationStatus.NOT_DELEGATED + elif auth_ns_addrs is None: + # error + self.delegation_status = self.DelegationStatus + + print( + self.name, + list(parent_nameservers), + list(parent_nameservers_addrs), + auth_ns_names, + auth_ns_addrs, + ) + + now = timezone.now() + self.delegation_status_touched = now + if old_delegation_status != self.delegation_status: + self.delegation_status_changed = now + return self.delegation_status + + def update_dns_security_status(self) -> SecurityStatus: + """Queries the DNS to determine the security status of this domain and + updates the security status on record.""" + old_security_status = self.security_status + + if self.delegation_status not in [ + self.DelegationStatus.MULTI, + self.DelegationStatus.EXCLUSIVE, + ]: + self.security_status = None + return None + + try: + auth_ds = set( + resolver_CD.resolve( + self.name, dns.rdatatype.DS, raise_on_no_answer=False + ) + ) + except dns.resolver.NXDOMAIN: + self.security_status = self.SecurityStatus.ERROR_NXDOMAIN + except dns.resolver.NoNameservers: + self.delegation_status = self.SecurityStatus.ERROR_NO_NAMESERVERS + except dns.resolver.LifetimeTimeout: + self.delegation_status = self.SecurityStatus.ERROR_TIMEOUT + else: + auth_ds = {ds for ds in auth_ds if ds.digest_type == 2} + + # Compute overlap of delegation DS records with ours + our_ds_set = { + dns.rdata.from_text(rdclass="IN", rdtype="DS", tok=ds) + for key in self.keys + for ds in key.get("ds", []) + if dns.rdata.from_text(rdclass="IN", rdtype="DS", tok=ds).digest_type + == 2 # only digest type 2 is mandatory + } + + if our_ds_set == auth_ds: + self.security_status = self.SecurityStatus.SECURE_EXCLUSIVE + elif our_ds_set < auth_ds: + self.security_status = self.SecurityStatus.SECURE + elif auth_ds != set(): + self.security_status = self.SecurityStatus.FOREIGN_KEYS + else: + self.security_status = self.SecurityStatus.INSECURE + + now = timezone.now() + self.security_status_touched = now + if old_security_status != self.security_status: + self.security_status_changed = now + return self.security_status + @property def keys(self): if not self._keys: diff --git a/api/desecapi/tests/test_domains.py b/api/desecapi/tests/test_domains.py index 6316de592..dbfe9219b 100644 --- a/api/desecapi/tests/test_domains.py +++ b/api/desecapi/tests/test_domains.py @@ -4,9 +4,13 @@ from django.core import mail from django.core.exceptions import ValidationError from django.test import override_settings +import dns.name +import dns.rdtypes +import dns.rdtypes.ANY +import pytest from rest_framework import status -from desecapi.models import Domain +from desecapi.models import Domain, User from desecapi.pdns_change_tracker import PDNSChangeTracker from desecapi.tests.base import ( DesecTestCase, @@ -1011,3 +1015,205 @@ def test_filter_qname_invalid(self): "a_B_example", ]: self.assertFalse(Domain.objects.filter_qname(qname)) + + +class TestDnsAuthNs: + """Tests Domain.dns_auth_ns.""" + + def test_apex(self, user): + assert Domain(name="desec.io").dns_auth_ns == { + dns.name.from_text("ns1.desec.io"), + dns.name.from_text("ns2.desec.org"), + } + + def test_non_apex(self): + assert Domain(name="foobar.desec.io").dns_auth_ns == set() + + def test_non_existant(self): + assert Domain(name="something.example").dns_auth_ns == set() + + +class TestDnsAuthDs: + """Tests Domain.dns_auth_ds.""" + + def test_apex(self): + assert Domain(name="desec.io").dns_auth_ds == { + dns.rdata.from_text( + rdclass="IN", + rdtype="DS", + tok="53307 13 2 3f6a815a28593ba6ff5a3e5da9b9af695d8f1022b7e0bda02793c53596da0944", + ), + dns.rdata.from_text( + rdclass="IN", + rdtype="DS", + tok="53307 13 4 156e3dfcceb69a9b2c4f1c769d9fe8558bfbcf5f0475aba5da5924a5f613c1be287529d8da5df773e573a2a08c69ab99", + ), + } + + def test_non_apex(self): + assert Domain(name="foobar.desec.io").dns_auth_ns == set() + + def test_non_existant(self): + assert Domain(name="something.example").dns_auth_ns == set() + + +class TestDnsDelegationStatus: + """Tests Domain.update_dns_delegation_status.""" + + OUR_NS_NAMES = {dns.name.from_text(ns) for ns in settings.DEFAULT_NS} + OUR_NS_NAMES_PARTIAL = set(list(OUR_NS_NAMES)[:1]) + OTHER_NS_NAMES = { + dns.name.from_text("ns1.example"), + dns.name.from_text("ns2.example"), + dns.name.from_text("ns3.example"), + } + + @pytest.fixture + def domain(self) -> Domain: + return Domain(name="foobar.example") + + def test_our_ns_names(self): + assert len(self.OUR_NS_NAMES) >= 2 + + def test_updates_db(self, domain: Domain, mocker): + assert domain.delegation_status is None + assert domain.delegation_status_touched is None + mocker.patch.object(domain, "dns_auth_ns", return_value=self.OUR_NS_NAMES) + domain.update_dns_delegation_status() + assert domain.delegation_status is not None + assert domain.delegation_status_touched is not None + + def test_exclusive(self, domain: Domain, mocker): + mocker.patch.object(domain, "dns_auth_ns", return_value=self.OUR_NS_NAMES) + assert ( + domain.update_dns_delegation_status() == Domain.DelegationStatus.EXCLUSIVE + ) + domain.dns_auth_ns.assert_called_once() + + def test_multi(self, domain: Domain, mocker): + mocker.patch.object( + domain, "dns_auth_ns", return_value=self.OUR_NS_NAMES | self.OTHER_NS_NAMES + ) + assert domain.update_dns_delegation_status() == Domain.DelegationStatus.MULTI + domain.dns_auth_ns.assert_called_once() + + def test_partial_with_others(self, domain: Domain, mocker): + mocker.patch.object( + domain, + "dns_auth_ns", + return_value=self.OUR_NS_NAMES_PARTIAL | self.OTHER_NS_NAMES, + ) + assert domain.update_dns_delegation_status() == Domain.DelegationStatus.PARTIAL + domain.dns_auth_ns.assert_called_once() + + def test_partial(self, domain: Domain, mocker): + mocker.patch.object( + domain, "dns_auth_ns", return_value=self.OUR_NS_NAMES_PARTIAL + ) + assert domain.update_dns_delegation_status() == Domain.DelegationStatus.PARTIAL + domain.dns_auth_ns.assert_called_once() + + def test_elsewhere(self, domain: Domain, mocker): + mocker.patch.object(domain, "dns_auth_ns", return_value=self.OTHER_NS_NAMES) + assert ( + domain.update_dns_delegation_status() == Domain.DelegationStatus.ELSEWHERE + ) + domain.dns_auth_ns.assert_called_once() + + def test_elsewhere(self, domain: Domain, mocker): + mocker.patch.object(domain, "dns_auth_ns", return_value=set()) + assert ( + domain.update_dns_delegation_status() + == Domain.DelegationStatus.NOT_DELEGATED + ) + domain.dns_auth_ns.assert_called_once() + + +class TestDnsSecurityStatus: + """Tests Domain.update_dns_security_status.""" + + @staticmethod + def ds_text2rdata(text: set[str]) -> set[dns.rdtypes.ANY.DS.DS]: + return {dns.rdata.from_text(rdclass="IN", rdtype="DS", tok=tok) for tok in text} + + @staticmethod + def ds_rdata2text(rdata: set[dns.rdtypes.ANY.DS.DS]) -> set[str]: + return {rr.to_text() for rr in rdata} + + OUR_DS = ds_text2rdata( + { + "53307 13 2 3f6a815a28593ba6ff5a3e5da9b9af695d8f1022b7e0bda02793c53596da0944", + "53307 13 4 156e3dfcceb69a9b2c4f1c769d9fe8558bfbcf5f0475aba5da5924a5f613c1be287529d8da5df773e573a2a08c69ab99", + } + ) + SOME_DS = ds_text2rdata( + { + "53307 13 2 6AA22DA11FAEF00937061E39A2D6C174EA300A8B9A78D6199CF8DAAB41F4332E", + "53307 13 4 CE807E76229D64AAE886B2732355BDD68E2C9D1039609085DFE15933FE031C71981A7FA77F45B22192F76724A9B29FB9", + } + ) + + @pytest.fixture + def domain(self): + d = Domain(name="example.example") + d.delegation_status = Domain.DelegationStatus.EXCLUSIVE + d._keys = [{"ds": list(self.ds_rdata2text(self.OUR_DS))}] + return d + + def test_updates_db(self, domain: Domain, mocker): + assert domain.security_status is None + assert domain.security_status_touched is None + mocker.patch.object(domain, "dns_auth_ds", return_value=self.OUR_DS) + domain.update_dns_security_status() + assert domain.security_status is not None + assert domain.security_status_touched is not None + + def test_match_2_and_4(self, domain: Domain, mocker): + mocker.patch.object(domain, "dns_auth_ds", return_value=self.OUR_DS) + assert ( + domain.update_dns_security_status() + == Domain.SecurityStatus.SECURE_EXCLUSIVE + ) + domain.dns_auth_ds.assert_called_once() + + def test_match_2(self, domain: Domain, mocker): + mocker.patch.object( + domain, + "dns_auth_ds", + return_value={rr for rr in self.OUR_DS if rr.digest_type == 2}, + ) + assert ( + domain.update_dns_security_status() + == Domain.SecurityStatus.SECURE_EXCLUSIVE + ) + domain.dns_auth_ds.assert_called_once() + + def test_superset(self, domain: Domain, mocker): + mocker.patch.object( + domain, "dns_auth_ds", return_value=self.OUR_DS | self.SOME_DS + ) + assert domain.update_dns_security_status() == Domain.SecurityStatus.SECURE + domain.dns_auth_ds.assert_called_once() + + def test_no_ds(self, domain: Domain, mocker): + mocker.patch.object(domain, "dns_auth_ds", return_value=set()) + assert domain.update_dns_security_status() == Domain.SecurityStatus.INSECURE + domain.dns_auth_ds.assert_called_once() + + def test_only_digest_type_4_ours(self, domain: Domain, mocker): + mocker.patch.object( + domain, + "dns_auth_ds", + return_value={rr for rr in self.OUR_DS if rr.digest_type == 4}, + ) + assert domain.update_dns_security_status() == Domain.SecurityStatus.INSECURE + domain.dns_auth_ds.assert_called_once() + + def test_only_digest_type_4_some(self, domain: Domain, mocker): + mocker.patch.object( + domain, + "dns_auth_ds", + return_value={rr for rr in self.SOME_DS if rr.digest_type == 4}, + ) + assert domain.update_dns_security_status() == Domain.SecurityStatus.INSECURE + domain.dns_auth_ds.assert_called_once() diff --git a/api/desecapi/tests/test_mail_backends.py b/api/desecapi/tests/test_mail_backends.py index 835605aa4..3ceb90647 100644 --- a/api/desecapi/tests/test_mail_backends.py +++ b/api/desecapi/tests/test_mail_backends.py @@ -18,7 +18,7 @@ }, ) class MultiLaneEmailBackendTestCase(TestCase): - test_backend = settings.EMAIL_BACKEND + test_backend = "django.core.mail.backends.locmem.EmailBackend" def test_lanes(self): debug_params = {"foo": "bar"} diff --git a/api/entrypoint-tests.sh b/api/entrypoint-tests.sh index c1e691ff8..88a1f409c 100755 --- a/api/entrypoint-tests.sh +++ b/api/entrypoint-tests.sh @@ -9,5 +9,5 @@ echo "waiting for dependencies ..." /root/cronhook/start-cron.sh & echo Starting API tests ... -coverage run --source='.' manage.py test -v 3 --noinput +coverage run --source='.' -m pytest coverage report diff --git a/api/pytest.ini b/api/pytest.ini new file mode 100644 index 000000000..e57026f46 --- /dev/null +++ b/api/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +DJANGO_SETTINGS_MODULE = api.settings diff --git a/api/requirements-test.txt b/api/requirements-test.txt new file mode 100644 index 000000000..b36758d77 --- /dev/null +++ b/api/requirements-test.txt @@ -0,0 +1,3 @@ +coverage~=7.8.0 +pytest-django~=4.11.1 +pytest-mock~=3.14.0 diff --git a/api/requirements.txt b/api/requirements.txt index 3fa6af9be..99b57433d 100644 --- a/api/requirements.txt +++ b/api/requirements.txt @@ -1,6 +1,5 @@ captcha~=0.7.1 celery~=5.4.0 -coverage~=7.8.0 cryptography~=44.0.2 Django~=5.1.7 django-cors-headers~=4.7.0