Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion edict/backend/app/channels/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import ipaddress
import socket
from typing import Protocol, ClassVar
from abc import abstractmethod

Expand Down Expand Up @@ -32,4 +34,49 @@ def _extract_domain(cls, url: str) -> str:
parsed = urlparse(url)
return parsed.netloc.lower()
except Exception:
return ''
return ''

@classmethod
def _is_public_host(cls, url: str) -> bool:
"""Return True only if the URL host resolves exclusively to public IPs.

Blocks SSRF vectors: loopback, link-local (incl. 169.254.169.254 cloud
metadata), private (RFC1918), multicast, reserved and unspecified
addresses, plus IPv6 equivalents.
"""
try:
from urllib.parse import urlparse
parsed = urlparse(url)
except Exception:
return False
host = parsed.hostname
if not host:
return False
# Reject hosts that are already raw IP literals pointing at unsafe ranges
# plus any DNS name that resolves to one. We must check *all* resolved
# addresses, not just the first, to avoid bypass via multi-A records.
try:
infos = socket.getaddrinfo(host, None)
except socket.gaierror:
return False
if not infos:
return False
for info in infos:
sockaddr = info[4]
ip_str = sockaddr[0]
try:
ip = ipaddress.ip_address(ip_str)
except ValueError:
return False
if (ip.is_private or ip.is_loopback or ip.is_link_local
or ip.is_multicast or ip.is_reserved
or ip.is_unspecified):
return False
# Additionally block IPv4-mapped/compatible IPv6 to private space
if isinstance(ip, ipaddress.IPv6Address) and ip.ipv4_mapped is not None:
mapped = ip.ipv4_mapped
if (mapped.is_private or mapped.is_loopback or mapped.is_link_local
or mapped.is_multicast or mapped.is_reserved
or mapped.is_unspecified):
return False
return True
13 changes: 12 additions & 1 deletion edict/backend/app/channels/webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,21 @@ class WebhookChannel(NotificationChannel):

@classmethod
def validate_webhook(cls, webhook: str) -> bool:
return cls._validate_url_scheme(webhook)
# Require HTTPS and that the destination resolves to a public IP only.
# Without this SSRF guard, the generic webhook channel would happily
# POST to internal services (e.g. http(s)://169.254.169.254/, RFC1918
# hosts, localhost) when an operator pastes such a URL into the
# notification config.
if not cls._validate_url_scheme(webhook):
return False
return cls._is_public_host(webhook)

@classmethod
def send(cls, webhook: str, title: str, content: str, url: str | None = None) -> bool:
# Re-validate at send time as a defence-in-depth measure against
# config tampering / DNS rebinding between validation and send.
if not cls.validate_webhook(webhook):
return False
payload = json.dumps({
'title': title,
'content': content,
Expand Down
71 changes: 71 additions & 0 deletions tests/test_webhook_ssrf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""PoC test for CWE-918 SSRF in WebhookChannel."""
from __future__ import annotations

import sys
from pathlib import Path
from unittest import mock

ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "edict" / "backend"))

from app.channels.webhook import WebhookChannel # noqa: E402


def _patch_dns(host_to_ip):
def fake_getaddrinfo(host, *_a, **_k):
ip = host_to_ip.get(host)
if ip is None:
import socket
raise socket.gaierror('mock')
return [(2, 1, 6, '', (ip, 0))]
return mock.patch('app.channels.base.socket.getaddrinfo',
side_effect=fake_getaddrinfo)


def test_blocks_loopback():
with _patch_dns({'127.0.0.1': '127.0.0.1', 'localhost': '127.0.0.1'}):
assert WebhookChannel.validate_webhook('https://127.0.0.1/x') is False
assert WebhookChannel.validate_webhook('https://localhost/x') is False


def test_blocks_cloud_metadata():
with _patch_dns({'169.254.169.254': '169.254.169.254',
'metadata.google.internal': '169.254.169.254'}):
assert WebhookChannel.validate_webhook('https://169.254.169.254/latest/meta-data/') is False
assert WebhookChannel.validate_webhook('https://metadata.google.internal/') is False


def test_blocks_rfc1918():
with _patch_dns({'10.0.0.5': '10.0.0.5',
'192.168.1.1': '192.168.1.1',
'172.16.0.1': '172.16.0.1'}):
assert WebhookChannel.validate_webhook('https://10.0.0.5/x') is False
assert WebhookChannel.validate_webhook('https://192.168.1.1/x') is False
assert WebhookChannel.validate_webhook('https://172.16.0.1/x') is False


def test_blocks_http_scheme():
assert WebhookChannel.validate_webhook('http://example.com/x') is False


def test_allows_public_host():
with _patch_dns({'example.com': '93.184.216.34'}):
assert WebhookChannel.validate_webhook('https://example.com/hook') is True


def test_send_refuses_unsafe_url():
with _patch_dns({'127.0.0.1': '127.0.0.1'}):
with mock.patch('app.channels.webhook.urlopen') as uo:
ok = WebhookChannel.send('https://127.0.0.1/x', 't', 'c')
assert ok is False
uo.assert_not_called()


if __name__ == '__main__':
test_blocks_loopback()
test_blocks_cloud_metadata()
test_blocks_rfc1918()
test_blocks_http_scheme()
test_allows_public_host()
test_send_refuses_unsafe_url()
print('OK')
Loading