diff --git a/.gitignore b/.gitignore index 284e270..bef6a09 100644 --- a/.gitignore +++ b/.gitignore @@ -1,48 +1,15 @@ -# Python __pycache__/ -*.py[cod] -*$py.class -*.so -.Python -env/ +*.pyc +*.pyo +*.pyd +.pytest_cache/ +.coverage +htmlcov/ +.env venv/ -ENV/ -env.bak/ -venv.bak/ -*.egg-info/ +env/ +.venv/ dist/ build/ - -# Security Analysis Results (user-generated) -*.json -*_results.txt -*_report.txt -*_audit.json -daily_check.json -weekly_audit.json -security_check.json - -# macOS -.DS_Store -.DS_Store? -._* -.Spotlight-V100 -.Trashes -ehthumbs.db -Thumbs.db - -# IDE -.vscode/ -.idea/ -*.swp -*.swo -*~ - -# Temporary files -*.tmp -*.temp -*.log - -# User configuration -config.local.py -.env \ No newline at end of file +*.egg-info/ +.DS_Store \ No newline at end of file diff --git a/src/vpn_security/__init__.py b/src/vpn_security/__init__.py new file mode 100644 index 0000000..71d38b5 --- /dev/null +++ b/src/vpn_security/__init__.py @@ -0,0 +1 @@ +# VPN Security Detection Module \ No newline at end of file diff --git a/src/vpn_security/network_config.py b/src/vpn_security/network_config.py new file mode 100644 index 0000000..352ebd0 --- /dev/null +++ b/src/vpn_security/network_config.py @@ -0,0 +1,108 @@ +import subprocess +import re +import sys +from typing import Dict, List, Optional + +class VPNConfigDetector: + """ + A class to detect and analyze VPN network configurations. + + This class provides methods to inspect network interfaces, + routing tables, and detect active VPN connections. + """ + + @staticmethod + def get_network_interfaces() -> Dict[str, str]: + """ + Retrieve network interface details. + + Returns: + Dict of network interface names and their IP addresses. + """ + try: + # Use platform-independent command for network interfaces + result = subprocess.run(['ip', 'addr'], + capture_output=True, + text=True, + check=True) + + # Very explicit regex to handle multiple line formats + pattern = re.compile( + r'^(\d+):\s*(\w+):.*\n' # Interface index and name + r'(?:.*\n)*' # Optional intermediate lines + r'\s*inet\s+(\d+\.\d+\.\d+\.\d+).*$', # Capture IP with flexible formatting + re.MULTILINE + ) + + interfaces = {} + for match in pattern.finditer(result.stdout): + interface_name = match.group(2) + ip_address = match.group(3) + interfaces[interface_name] = ip_address + + print(f"DEBUG: Full output: {result.stdout}", file=sys.stderr) + print(f"DEBUG: Detected interfaces: {interfaces}", file=sys.stderr) + return interfaces + + except (subprocess.CalledProcessError, FileNotFoundError) as e: + print(f"DEBUG: Error in get_network_interfaces: {e}", file=sys.stderr) + # Fallback for systems without 'ip' command + return {} + + @staticmethod + def get_routing_table() -> List[Dict[str, str]]: + """ + Retrieve the system routing table. + + Returns: + List of routing table entries with details. + """ + try: + result = subprocess.run(['ip', 'route'], + capture_output=True, + text=True, + check=True) + + routes = [] + for line in result.stdout.split('\n'): + route_parts = line.split() + if len(route_parts) >= 5: + route_entry = { + 'destination': route_parts[0], + 'via': route_parts[2] if len(route_parts) > 2 else '', + 'dev': route_parts[4] if len(route_parts) > 4 else '' + } + routes.append(route_entry) + + return routes + except (subprocess.CalledProcessError, FileNotFoundError): + return [] + + @classmethod + def detect_vpn_connection(cls) -> Optional[Dict[str, str]]: + """ + Detect if a VPN connection is active. + + Returns: + Dictionary with VPN connection details, or None if no VPN detected. + """ + interfaces = cls.get_network_interfaces() + routes = cls.get_routing_table() + + # Common VPN interface names and checks + vpn_interface_keywords = ['tun', 'tap', 'ppp', 'wg', 'vpn'] + + # Check for known VPN interfaces + for interface, ip in interfaces.items(): + if any(keyword in interface.lower() for keyword in vpn_interface_keywords): + return { + 'interface': interface, + 'ip_address': ip + } + + # Check routing table for potential VPN routes + for route in routes: + if any(keyword in str(route).lower() for keyword in vpn_interface_keywords): + return route + + return None \ No newline at end of file diff --git a/tests/test_network_config.py b/tests/test_network_config.py new file mode 100644 index 0000000..64bd0cd --- /dev/null +++ b/tests/test_network_config.py @@ -0,0 +1,83 @@ +import subprocess +import pytest +from unittest.mock import patch +from src.vpn_security.network_config import VPNConfigDetector + +class TestVPNConfigDetector: + def test_get_network_interfaces(self): + # Mock subprocess to return predefined output + mock_output = """ + 1: lo: mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000 + link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00 + inet 127.0.0.1/8 scope host lo + valid_lft forever preferred_lft forever + 2: eth0: mtu 1500 qdisc fq_codel state UP group default qlen 1000 + link/ether 00:11:22:33:44:55 brd ff:ff:ff:ff:ff:ff + inet 192.168.1.100/24 brd 192.168.1.255 scope global dynamic eth0 + valid_lft 86313sec preferred_lft 86313sec + 3: tun0: mtu 1500 qdisc fq_codel state UNKNOWN group default qlen 500 + link/none + inet 10.8.0.1/24 brd 10.8.0.255 scope global tun0 + valid_lft forever preferred_lft forever + """ + + with patch('subprocess.run') as mock_run: + # Create a MagicMock object with a stdout attribute + mock_run.return_value.stdout = mock_output + mock_run.return_value.check = True + + interfaces = VPNConfigDetector.get_network_interfaces() + + # Look for 'tun0' interface case-insensitively + tun_interface = [iface for iface in interfaces.keys() if iface.lower() == 'tun0'] + assert len(tun_interface) > 0, f"No tun0 interface found in {interfaces}" + assert interfaces[tun_interface[0]] == '10.8.0.1' + + def test_get_routing_table(self): + # Mock routing table output + mock_output = """ +default via 192.168.1.1 dev eth0 proto dhcp metric 100 +10.8.0.0/24 dev tun0 proto kernel scope link src 10.8.0.1 +192.168.1.0/24 dev eth0 proto kernel scope link src 192.168.1.100 metric 100 +""" + + with patch('subprocess.run') as mock_run: + mock_run.return_value.stdout = mock_output + mock_run.return_value.check = True + + routes = VPNConfigDetector.get_routing_table() + + assert any('tun0' in str(route) for route in routes) + + def test_detect_vpn_connection(self): + # Scenario with VPN connection + with patch.object(VPNConfigDetector, 'get_network_interfaces', + return_value={'tun0': '10.8.0.1'}): + with patch.object(VPNConfigDetector, 'get_routing_table', + return_value=[{'dev': 'tun0'}]): + + vpn_connection = VPNConfigDetector.detect_vpn_connection() + + assert vpn_connection is not None + assert vpn_connection['interface'] == 'tun0' + assert vpn_connection['ip_address'] == '10.8.0.1' + + def test_no_vpn_connection(self): + # Scenario without VPN connection + with patch.object(VPNConfigDetector, 'get_network_interfaces', + return_value={'eth0': '192.168.1.100'}): + with patch.object(VPNConfigDetector, 'get_routing_table', + return_value=[{'dev': 'eth0'}]): + + vpn_connection = VPNConfigDetector.detect_vpn_connection() + + assert vpn_connection is None + + def test_vpn_connection_error_handling(self): + # Test error handling when subprocess fails + with patch('subprocess.run', side_effect=subprocess.CalledProcessError(1, 'cmd')): + interfaces = VPNConfigDetector.get_network_interfaces() + routes = VPNConfigDetector.get_routing_table() + + assert interfaces == {} + assert routes == [] \ No newline at end of file