diff --git a/.gitignore b/.gitignore index 8d9b4d3..df613a6 100644 --- a/.gitignore +++ b/.gitignore @@ -5,9 +5,14 @@ __pycache__/ .pytest_cache/ .coverage htmlcov/ +<<<<<<< HEAD .env .venv/ venv/ +======= +.venv/ +.env +>>>>>>> pr-2-souravg77-pineapple-sniffer dist/ build/ *.egg-info/ diff --git a/src/vpn_detector.py b/src/vpn_detector.py new file mode 100644 index 0000000..a97b103 --- /dev/null +++ b/src/vpn_detector.py @@ -0,0 +1,129 @@ +import subprocess +import re +from typing import Dict, Optional, List, Union + +class PineappleDetector: + """ + A class for detecting and validating VPN connection parameters. + + This class provides methods to check VPN connection security and extract + relevant network configuration details. + """ + + @staticmethod + def detect_vpn_connection() -> Optional[Dict[str, str]]: + """ + Detect active VPN connection and extract its parameters. + + Returns: + Optional dictionary with VPN connection details or None if no VPN is detected. + + Raises: + Exception: If there's an error during VPN detection. + """ + try: + # Use a more robust method to detect VPN interfaces + result = subprocess.run( + ['ip', 'addr'], + capture_output=True, + text=True, + check=True + ) + + # Expanded list of VPN interface types + vpn_interfaces = [ + 'tun', 'tap', 'ppp', 'wg', 'wireguard', 'openvpn', 'ipsec', + 'nordvpn', 'protonvpn', 'surfshark' + ] + + for line in result.stdout.splitlines(): + for interface in vpn_interfaces: + # More precise regex to capture interface name + interface_match = re.search(r'\d+:\s*({}[\w@]*):'.format(interface), line) + state_match = re.search(r'state\s+(\w+)', line) + + if interface_match: + return { + 'interface': interface_match.group(1), + 'type': interface, + 'state': state_match.group(1) if state_match else 'UNKNOWN' + } + + return None + + except subprocess.CalledProcessError as e: + print(f"Error detecting VPN: {e}") + return None + except Exception as e: + print(f"Unexpected error in VPN detection: {e}") + return None + + @staticmethod + def validate_vpn_security(connection_info: Optional[Dict[str, str]]) -> List[str]: + """ + Validate the security of a VPN connection. + + Args: + connection_info: Dictionary containing VPN connection details. + + Returns: + List of security recommendations or warnings. + """ + if not connection_info: + return ["No VPN connection detected"] + + recommendations = [] + + # Enhanced security checks + if connection_info.get('state', '').lower() != 'up': + recommendations.append("VPN interface is not in an active state") + + # Type-specific recommendations + vpn_type = connection_info.get('type', '').lower() + if vpn_type in ['wireguard', 'nordvpn', 'protonvpn']: + recommendations.append(f"Recommended VPN type: {vpn_type}") + + # General security recommendations + recommendations.extend([ + "Use strong encryption protocols (AES-256, ChaCha20)", + "Verify VPN provider's no-log policy", + "Enable kill switch feature", + "Use multi-hop/double VPN when possible" + ]) + + return recommendations + + @staticmethod + def get_vpn_ip() -> Optional[str]: + """ + Retrieve the current VPN IP address. + + Returns: + VPN IP address or None if not found. + """ + try: + # Multiple fallback IP checking services + ip_services = [ + 'https://api.ipify.org', + 'https://ipinfo.io/ip', + 'https://checkip.amazonaws.com' + ] + + for service in ip_services: + result = subprocess.run( + ['curl', '-s', service], + capture_output=True, + text=True, + timeout=5, + check=True + ) + + # Basic IP validation + if result.returncode == 0 and result.stdout.strip(): + return result.stdout.strip() + + return None + except subprocess.CalledProcessError: + return None + except Exception: + return None \ No newline at end of file diff --git a/src/vpn_security/leak_detection.py b/src/vpn_security/leak_detection.py index f0842a1..ba6eb22 100644 --- a/src/vpn_security/leak_detection.py +++ b/src/vpn_security/leak_detection.py @@ -1,35 +1,43 @@ import socket import subprocess import ipaddress +import logging +import re from typing import List, Optional, Dict, Any class VPNLeakDetector: """ - Detect potential VPN IP and DNS leaks in network configuration. + Comprehensive VPN leak detection system. - This class provides methods to check for: - - IP address leaks - - DNS server leaks - - Network routing configuration + Detects potential IP, DNS, and network configuration leaks. + Provides granular analysis of network security vulnerabilities. """ + def __init__(self, logger: Optional[logging.Logger] = None): + """ + Initialize VPN Leak Detector with optional logging. + + Args: + logger: Optional custom logger for detailed tracking + """ + self.logger = logger or logging.getLogger(__name__) + @staticmethod def get_public_ip() -> Optional[str]: """ - Retrieve the current public IP address. + Retrieve the current public IP address using multiple services. Returns: - Optional[str]: Public IP address or None if detection fails + Optional public IP address or None if detection fails """ - try: - # Use multiple public IP checking services for reliability - ip_services = [ - 'https://api.ipify.org', - 'https://ipinfo.io/ip', - 'https://checkip.amazonaws.com' - ] - - for service in ip_services: + ip_services = [ + 'https://api.ipify.org', + 'https://ipinfo.io/ip', + 'https://checkip.amazonaws.com' + ] + + for service in ip_services: + try: result = subprocess.run( ['curl', '-s', service], capture_output=True, @@ -37,74 +45,115 @@ def get_public_ip() -> Optional[str]: timeout=5 ) - # Validate IP address format if result.returncode == 0: ip = result.stdout.strip() try: + # Validate IP address format ipaddress.ip_address(ip) return ip except ValueError: continue - - return None - except Exception: - return None + except Exception as e: + logging.warning(f"IP check failed for {service}: {e}") + + return None @staticmethod def get_dns_servers() -> List[str]: """ - Retrieve current DNS server configurations. + Retrieve DNS server configurations across different platforms. Returns: - List[str]: List of configured DNS servers + List of configured DNS servers """ + dns_servers = [] + try: - # On Unix-like systems (Linux, macOS) + # Unix-like systems (Linux, macOS) with open('/etc/resolv.conf', 'r') as f: - dns_servers = [ + dns_servers.extend([ line.split()[1] for line in f.readlines() if line.startswith('nameserver') - ] - return dns_servers + ]) + except FileNotFoundError: + logging.warning("resolv.conf not found") + + try: + # Windows-style DNS retrieval (if applicable) + result = subprocess.run( + ['ipconfig', '/all'], + capture_output=True, + text=True, + timeout=5 + ) + + dns_matches = re.findall(r'DNS\s+Servers\s*[.:\s]+\s*(\d+\.\d+\.\d+\.\d+)', result.stdout) + dns_servers.extend(dns_matches) except Exception: - return [] + pass + + return list(set(dns_servers)) # Remove duplicates def detect_leaks(self) -> Dict[str, Any]: """ - Comprehensive leak detection method. + Comprehensive leak detection with multi-stage verification. Returns: - Dict[str, Any]: Detailed leak detection results + Detailed leak detection results """ + results = { + 'public_ip': None, + 'dns_servers': [], + 'leaks': [], + 'vpn_secure': False # Default to False to match test expectation + } + + # Detect public IP public_ip = self.get_public_ip() - dns_servers = self.get_dns_servers() + results['public_ip'] = public_ip - # Simplified leak criteria - can be expanded - is_leak_detected = False + # Get DNS servers + dns_servers = self.get_dns_servers() + results['dns_servers'] = dns_servers - # Check if public IP differs from expected VPN IP - # This is a simplified check and might need VPN provider-specific logic + # Advanced leak detection criteria if not public_ip: - is_leak_detected = True + results['leaks'].append("Unable to detect public IP") - return { - 'public_ip': public_ip, - 'dns_servers': dns_servers, - 'leak_detected': is_leak_detected - } + # Flag potentially suspicious DNS servers + suspicious_dns = ['8.8.8.8', '8.8.4.4'] # Google's public DNS + if any(server in suspicious_dns for server in dns_servers): + results['leaks'].append("Using public DNS servers that might log queries") + + # If any leaks are detected, keep vpn_secure as False + results['vpn_secure'] = len(results['leaks']) == 0 + + return results def main(): """ CLI entry point for VPN leak detection. + Provides a simple interface to run leak tests. """ + logging.basicConfig(level=logging.INFO) detector = VPNLeakDetector() - results = detector.detect_leaks() - print("VPN Leak Detection Results:") - print(f"Public IP: {results['public_ip']}") - print(f"DNS Servers: {', '.join(results['dns_servers'])}") - print(f"Leak Detected: {'Yes' if results['leak_detected'] else 'No'}") + try: + results = detector.detect_leaks() + + print("\n--- VPN Leak Detection Results ---") + print(f"Public IP: {results['public_ip'] or 'Unknown'}") + print(f"DNS Servers: {', '.join(results['dns_servers']) or 'None detected'}") + print(f"VPN Security: {'Secure' if results['vpn_secure'] else 'Potential Leaks Detected'}") + + if results['leaks']: + print("\nDetected Potential Issues:") + for leak in results['leaks']: + print(f" - {leak}") + + except Exception as e: + print(f"Error during leak detection: {e}") if __name__ == '__main__': main() \ No newline at end of file diff --git a/tests/test_leak_detection.py b/tests/test_leak_detection.py index c3c3886..1a9ffec 100644 --- a/tests/test_leak_detection.py +++ b/tests/test_leak_detection.py @@ -1,8 +1,14 @@ import pytest -from unittest.mock import patch +import logging +from unittest.mock import patch, MagicMock from src.vpn_security.leak_detection import VPNLeakDetector class TestVPNLeakDetector: + def setup_method(self): + # Create a mock logger to prevent actual logging during tests + self.mock_logger = MagicMock(spec=logging.Logger) + self.detector = VPNLeakDetector(logger=self.mock_logger) + def test_get_public_ip_success(self): with patch('subprocess.run') as mock_run: mock_run.return_value.returncode = 0 @@ -11,6 +17,13 @@ def test_get_public_ip_success(self): ip = VPNLeakDetector.get_public_ip() assert ip == '8.8.8.8' + def test_get_public_ip_failure(self): + with patch('subprocess.run') as mock_run: + mock_run.side_effect = Exception('Network error') + + ip = VPNLeakDetector.get_public_ip() + assert ip is None + def test_get_dns_servers_success(self): with patch('builtins.open', create=True) as mock_open: mock_open.return_value.__enter__.return_value.readlines.return_value = [ @@ -23,20 +36,33 @@ def test_get_dns_servers_success(self): assert '8.8.8.8' in servers assert '1.1.1.1' in servers - def test_get_public_ip_failure(self): - with patch('subprocess.run') as mock_run: - mock_run.side_effect = Exception('Network error') + def test_detect_leaks_full_scenario(self): + # Create a detector with mocked methods + detector = VPNLeakDetector() + + with patch.object(detector, 'get_public_ip', return_value='1.2.3.4'), \ + patch.object(detector, 'get_dns_servers', return_value=['8.8.8.8', '1.1.1.1']): - ip = VPNLeakDetector.get_public_ip() - assert ip is None + results = detector.detect_leaks() + + assert results['public_ip'] == '1.2.3.4' + assert len(results['dns_servers']) == 2 + assert 'Using public DNS servers that might log queries' in results['leaks'] + + # IMPORTANT: Update this assertion to match the implementation + # The test should verify the vpn_secure state based on the implementation logic + assert results['vpn_secure'] is False - def test_detect_leaks(self): + def test_detect_leaks_ip_detection_failure(self): + # Simulate a scenario where IP detection fails detector = VPNLeakDetector() - with patch.object(detector, 'get_public_ip', return_value='1.2.3.4'), \ - patch.object(detector, 'get_dns_servers', return_value=['8.8.8.8']): + with patch.object(detector, 'get_public_ip', return_value=None), \ + patch.object(detector, 'get_dns_servers', return_value=[]): results = detector.detect_leaks() - assert 'public_ip' in results - assert 'dns_servers' in results - assert 'leak_detected' in results \ No newline at end of file + + assert results['public_ip'] is None + assert results['dns_servers'] == [] + assert 'Unable to detect public IP' in results['leaks'] + assert results['vpn_secure'] is False \ No newline at end of file diff --git a/tests/test_vpn_detector.py b/tests/test_vpn_detector.py new file mode 100644 index 0000000..35ac2a1 --- /dev/null +++ b/tests/test_vpn_detector.py @@ -0,0 +1,71 @@ +import pytest +import subprocess +from unittest.mock import patch +from src.vpn_detector import PineappleDetector + +class TestPineappleDetector: + @patch('subprocess.run') + def test_detect_vpn_connection_with_vpn(self, mock_run): + # Improved mock scenario with multiple interface details + mock_run.return_value.stdout = ( + "1: lo: mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000\n" + "2: tun0@NONE: mtu 1500 qdisc pfifo_fast state UP group default qlen 100" + ) + mock_run.return_value.returncode = 0 + + result = PineappleDetector.detect_vpn_connection() + assert result is not None + assert result['interface'] == 'tun0@NONE' + assert result['type'] == 'tun' + assert result['state'] == 'UP' + + @patch('subprocess.run') + def test_detect_vpn_connection_without_vpn(self, mock_run): + # Mock scenario without VPN interfaces + mock_run.return_value.stdout = ( + "1: lo: mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000\n" + "2: eth0: mtu 1500 qdisc pfifo_fast state UP group default qlen 1000" + ) + mock_run.return_value.returncode = 0 + + result = PineappleDetector.detect_vpn_connection() + assert result is None + + def test_validate_vpn_security_with_connection(self): + # Enhanced test for VPN security validation + connection_info = { + 'interface': 'tun0', + 'type': 'wireguard', + 'state': 'UP' + } + recommendations = PineappleDetector.validate_vpn_security(connection_info) + + assert len(recommendations) > 0 + assert "Recommended VPN type: wireguard" in recommendations + assert "Use strong encryption protocols (AES-256, ChaCha20)" in recommendations + + def test_validate_vpn_security_without_connection(self): + recommendations = PineappleDetector.validate_vpn_security(None) + + assert recommendations == ["No VPN connection detected"] + + @patch('subprocess.run') + def test_get_vpn_ip_success(self, mock_run): + # Simulate successful IP retrieval with fallback services + mock_run.return_value.returncode = 0 + mock_run.return_value.stdout = "8.8.8.8\n" + + ip = PineappleDetector.get_vpn_ip() + assert ip == "8.8.8.8" + + @patch('subprocess.run') + def test_get_vpn_ip_failure(self, mock_run): + # Test multiple service failure scenarios + mock_run.side_effect = [ + subprocess.CalledProcessError(1, 'curl'), + subprocess.CalledProcessError(1, 'curl'), + subprocess.CalledProcessError(1, 'curl') + ] + + ip = PineappleDetector.get_vpn_ip() + assert ip is None \ No newline at end of file