diff --git a/edict/backend/app/channels/base.py b/edict/backend/app/channels/base.py index 429ab059..1ce2a9f7 100644 --- a/edict/backend/app/channels/base.py +++ b/edict/backend/app/channels/base.py @@ -1,5 +1,7 @@ from __future__ import annotations +import ipaddress +import socket from typing import Protocol, ClassVar from abc import abstractmethod @@ -32,4 +34,49 @@ def _extract_domain(cls, url: str) -> str: parsed = urlparse(url) return parsed.netloc.lower() except Exception: - return '' \ No newline at end of file + return '' + + @classmethod + def _is_public_host(cls, url: str) -> bool: + """Return True only if the URL host resolves exclusively to public IPs. + + Blocks SSRF vectors: loopback, link-local (incl. 169.254.169.254 cloud + metadata), private (RFC1918), multicast, reserved and unspecified + addresses, plus IPv6 equivalents. + """ + try: + from urllib.parse import urlparse + parsed = urlparse(url) + except Exception: + return False + host = parsed.hostname + if not host: + return False + # Reject hosts that are already raw IP literals pointing at unsafe ranges + # plus any DNS name that resolves to one. We must check *all* resolved + # addresses, not just the first, to avoid bypass via multi-A records. + try: + infos = socket.getaddrinfo(host, None) + except socket.gaierror: + return False + if not infos: + return False + for info in infos: + sockaddr = info[4] + ip_str = sockaddr[0] + try: + ip = ipaddress.ip_address(ip_str) + except ValueError: + return False + if (ip.is_private or ip.is_loopback or ip.is_link_local + or ip.is_multicast or ip.is_reserved + or ip.is_unspecified): + return False + # Additionally block IPv4-mapped/compatible IPv6 to private space + if isinstance(ip, ipaddress.IPv6Address) and ip.ipv4_mapped is not None: + mapped = ip.ipv4_mapped + if (mapped.is_private or mapped.is_loopback or mapped.is_link_local + or mapped.is_multicast or mapped.is_reserved + or mapped.is_unspecified): + return False + return True diff --git a/edict/backend/app/channels/webhook.py b/edict/backend/app/channels/webhook.py index 88843677..40cfcb05 100644 --- a/edict/backend/app/channels/webhook.py +++ b/edict/backend/app/channels/webhook.py @@ -17,10 +17,21 @@ class WebhookChannel(NotificationChannel): @classmethod def validate_webhook(cls, webhook: str) -> bool: - return cls._validate_url_scheme(webhook) + # Require HTTPS and that the destination resolves to a public IP only. + # Without this SSRF guard, the generic webhook channel would happily + # POST to internal services (e.g. http(s)://169.254.169.254/, RFC1918 + # hosts, localhost) when an operator pastes such a URL into the + # notification config. + if not cls._validate_url_scheme(webhook): + return False + return cls._is_public_host(webhook) @classmethod def send(cls, webhook: str, title: str, content: str, url: str | None = None) -> bool: + # Re-validate at send time as a defence-in-depth measure against + # config tampering / DNS rebinding between validation and send. + if not cls.validate_webhook(webhook): + return False payload = json.dumps({ 'title': title, 'content': content, diff --git a/tests/test_webhook_ssrf.py b/tests/test_webhook_ssrf.py new file mode 100644 index 00000000..fdfda17c --- /dev/null +++ b/tests/test_webhook_ssrf.py @@ -0,0 +1,71 @@ +"""PoC test for CWE-918 SSRF in WebhookChannel.""" +from __future__ import annotations + +import sys +from pathlib import Path +from unittest import mock + +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT / "edict" / "backend")) + +from app.channels.webhook import WebhookChannel # noqa: E402 + + +def _patch_dns(host_to_ip): + def fake_getaddrinfo(host, *_a, **_k): + ip = host_to_ip.get(host) + if ip is None: + import socket + raise socket.gaierror('mock') + return [(2, 1, 6, '', (ip, 0))] + return mock.patch('app.channels.base.socket.getaddrinfo', + side_effect=fake_getaddrinfo) + + +def test_blocks_loopback(): + with _patch_dns({'127.0.0.1': '127.0.0.1', 'localhost': '127.0.0.1'}): + assert WebhookChannel.validate_webhook('https://127.0.0.1/x') is False + assert WebhookChannel.validate_webhook('https://localhost/x') is False + + +def test_blocks_cloud_metadata(): + with _patch_dns({'169.254.169.254': '169.254.169.254', + 'metadata.google.internal': '169.254.169.254'}): + assert WebhookChannel.validate_webhook('https://169.254.169.254/latest/meta-data/') is False + assert WebhookChannel.validate_webhook('https://metadata.google.internal/') is False + + +def test_blocks_rfc1918(): + with _patch_dns({'10.0.0.5': '10.0.0.5', + '192.168.1.1': '192.168.1.1', + '172.16.0.1': '172.16.0.1'}): + assert WebhookChannel.validate_webhook('https://10.0.0.5/x') is False + assert WebhookChannel.validate_webhook('https://192.168.1.1/x') is False + assert WebhookChannel.validate_webhook('https://172.16.0.1/x') is False + + +def test_blocks_http_scheme(): + assert WebhookChannel.validate_webhook('http://example.com/x') is False + + +def test_allows_public_host(): + with _patch_dns({'example.com': '93.184.216.34'}): + assert WebhookChannel.validate_webhook('https://example.com/hook') is True + + +def test_send_refuses_unsafe_url(): + with _patch_dns({'127.0.0.1': '127.0.0.1'}): + with mock.patch('app.channels.webhook.urlopen') as uo: + ok = WebhookChannel.send('https://127.0.0.1/x', 't', 'c') + assert ok is False + uo.assert_not_called() + + +if __name__ == '__main__': + test_blocks_loopback() + test_blocks_cloud_metadata() + test_blocks_rfc1918() + test_blocks_http_scheme() + test_allows_public_host() + test_send_refuses_unsafe_url() + print('OK')