Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@ __pycache__/
.pytest_cache/
.coverage
htmlcov/
<<<<<<< HEAD
.env
.venv/
venv/
=======
.venv/
.env
>>>>>>> pr-2-souravg77-pineapple-sniffer
dist/
build/
*.egg-info/
Expand Down
129 changes: 129 additions & 0 deletions src/vpn_detector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import subprocess
import re
from typing import Dict, Optional, List, Union

class PineappleDetector:
"""
A class for detecting and validating VPN connection parameters.

This class provides methods to check VPN connection security and extract
relevant network configuration details.
"""

@staticmethod
def detect_vpn_connection() -> Optional[Dict[str, str]]:
"""
Detect active VPN connection and extract its parameters.

Returns:
Optional dictionary with VPN connection details or None if no VPN is detected.

Raises:
Exception: If there's an error during VPN detection.
"""
try:
# Use a more robust method to detect VPN interfaces
result = subprocess.run(
['ip', 'addr'],
capture_output=True,
text=True,
check=True
)

# Expanded list of VPN interface types
vpn_interfaces = [
'tun', 'tap', 'ppp', 'wg', 'wireguard', 'openvpn', 'ipsec',
'nordvpn', 'protonvpn', 'surfshark'
]

for line in result.stdout.splitlines():
for interface in vpn_interfaces:
# More precise regex to capture interface name
interface_match = re.search(r'\d+:\s*({}[\w@]*):'.format(interface), line)
state_match = re.search(r'state\s+(\w+)', line)

if interface_match:
return {
'interface': interface_match.group(1),
'type': interface,
'state': state_match.group(1) if state_match else 'UNKNOWN'
}

return None

except subprocess.CalledProcessError as e:
print(f"Error detecting VPN: {e}")
return None
except Exception as e:
print(f"Unexpected error in VPN detection: {e}")
return None

@staticmethod
def validate_vpn_security(connection_info: Optional[Dict[str, str]]) -> List[str]:
"""
Validate the security of a VPN connection.

Args:
connection_info: Dictionary containing VPN connection details.

Returns:
List of security recommendations or warnings.
"""
if not connection_info:
return ["No VPN connection detected"]

recommendations = []

# Enhanced security checks
if connection_info.get('state', '').lower() != 'up':
recommendations.append("VPN interface is not in an active state")

# Type-specific recommendations
vpn_type = connection_info.get('type', '').lower()
if vpn_type in ['wireguard', 'nordvpn', 'protonvpn']:
recommendations.append(f"Recommended VPN type: {vpn_type}")

# General security recommendations
recommendations.extend([
"Use strong encryption protocols (AES-256, ChaCha20)",
"Verify VPN provider's no-log policy",
"Enable kill switch feature",
"Use multi-hop/double VPN when possible"
])

return recommendations

@staticmethod
def get_vpn_ip() -> Optional[str]:
"""
Retrieve the current VPN IP address.

Returns:
VPN IP address or None if not found.
"""
try:
# Multiple fallback IP checking services
ip_services = [
'https://api.ipify.org',
'https://ipinfo.io/ip',
'https://checkip.amazonaws.com'
]

for service in ip_services:
result = subprocess.run(
['curl', '-s', service],
capture_output=True,
text=True,
timeout=5,
check=True
)

# Basic IP validation
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()

return None
except subprocess.CalledProcessError:
return None
except Exception:
return None
141 changes: 95 additions & 46 deletions src/vpn_security/leak_detection.py
Original file line number Diff line number Diff line change
@@ -1,110 +1,159 @@
import socket
import subprocess
import ipaddress
import logging
import re
from typing import List, Optional, Dict, Any

class VPNLeakDetector:
"""
Detect potential VPN IP and DNS leaks in network configuration.
Comprehensive VPN leak detection system.

This class provides methods to check for:
- IP address leaks
- DNS server leaks
- Network routing configuration
Detects potential IP, DNS, and network configuration leaks.
Provides granular analysis of network security vulnerabilities.
"""

def __init__(self, logger: Optional[logging.Logger] = None):
"""
Initialize VPN Leak Detector with optional logging.

Args:
logger: Optional custom logger for detailed tracking
"""
self.logger = logger or logging.getLogger(__name__)

@staticmethod
def get_public_ip() -> Optional[str]:
"""
Retrieve the current public IP address.
Retrieve the current public IP address using multiple services.

Returns:
Optional[str]: Public IP address or None if detection fails
Optional public IP address or None if detection fails
"""
try:
# Use multiple public IP checking services for reliability
ip_services = [
'https://api.ipify.org',
'https://ipinfo.io/ip',
'https://checkip.amazonaws.com'
]

for service in ip_services:
ip_services = [
'https://api.ipify.org',
'https://ipinfo.io/ip',
'https://checkip.amazonaws.com'
]

for service in ip_services:
try:
result = subprocess.run(
['curl', '-s', service],
capture_output=True,
text=True,
timeout=5
)

# Validate IP address format
if result.returncode == 0:
ip = result.stdout.strip()
try:
# Validate IP address format
ipaddress.ip_address(ip)
return ip
except ValueError:
continue

return None
except Exception:
return None
except Exception as e:
logging.warning(f"IP check failed for {service}: {e}")

return None

@staticmethod
def get_dns_servers() -> List[str]:
"""
Retrieve current DNS server configurations.
Retrieve DNS server configurations across different platforms.

Returns:
List[str]: List of configured DNS servers
List of configured DNS servers
"""
dns_servers = []

try:
# On Unix-like systems (Linux, macOS)
# Unix-like systems (Linux, macOS)
with open('/etc/resolv.conf', 'r') as f:
dns_servers = [
dns_servers.extend([
line.split()[1]
for line in f.readlines()
if line.startswith('nameserver')
]
return dns_servers
])
except FileNotFoundError:
logging.warning("resolv.conf not found")

try:
# Windows-style DNS retrieval (if applicable)
result = subprocess.run(
['ipconfig', '/all'],
capture_output=True,
text=True,
timeout=5
)

dns_matches = re.findall(r'DNS\s+Servers\s*[.:\s]+\s*(\d+\.\d+\.\d+\.\d+)', result.stdout)
dns_servers.extend(dns_matches)
except Exception:
return []
pass

return list(set(dns_servers)) # Remove duplicates

def detect_leaks(self) -> Dict[str, Any]:
"""
Comprehensive leak detection method.
Comprehensive leak detection with multi-stage verification.

Returns:
Dict[str, Any]: Detailed leak detection results
Detailed leak detection results
"""
results = {
'public_ip': None,
'dns_servers': [],
'leaks': [],
'vpn_secure': False # Default to False to match test expectation
}

# Detect public IP
public_ip = self.get_public_ip()
dns_servers = self.get_dns_servers()
results['public_ip'] = public_ip

# Simplified leak criteria - can be expanded
is_leak_detected = False
# Get DNS servers
dns_servers = self.get_dns_servers()
results['dns_servers'] = dns_servers

# Check if public IP differs from expected VPN IP
# This is a simplified check and might need VPN provider-specific logic
# Advanced leak detection criteria
if not public_ip:
is_leak_detected = True
results['leaks'].append("Unable to detect public IP")

return {
'public_ip': public_ip,
'dns_servers': dns_servers,
'leak_detected': is_leak_detected
}
# Flag potentially suspicious DNS servers
suspicious_dns = ['8.8.8.8', '8.8.4.4'] # Google's public DNS
if any(server in suspicious_dns for server in dns_servers):
results['leaks'].append("Using public DNS servers that might log queries")

# If any leaks are detected, keep vpn_secure as False
results['vpn_secure'] = len(results['leaks']) == 0

return results

def main():
"""
CLI entry point for VPN leak detection.
Provides a simple interface to run leak tests.
"""
logging.basicConfig(level=logging.INFO)
detector = VPNLeakDetector()
results = detector.detect_leaks()

print("VPN Leak Detection Results:")
print(f"Public IP: {results['public_ip']}")
print(f"DNS Servers: {', '.join(results['dns_servers'])}")
print(f"Leak Detected: {'Yes' if results['leak_detected'] else 'No'}")
try:
results = detector.detect_leaks()

print("\n--- VPN Leak Detection Results ---")
print(f"Public IP: {results['public_ip'] or 'Unknown'}")
print(f"DNS Servers: {', '.join(results['dns_servers']) or 'None detected'}")
print(f"VPN Security: {'Secure' if results['vpn_secure'] else 'Potential Leaks Detected'}")

if results['leaks']:
print("\nDetected Potential Issues:")
for leak in results['leaks']:
print(f" - {leak}")

except Exception as e:
print(f"Error during leak detection: {e}")

if __name__ == '__main__':
main()
Loading