diff --git a/recon_module.py b/recon_module.py index df56bec..adcafc8 100755 --- a/recon_module.py +++ b/recon_module.py @@ -18,63 +18,64 @@ from rich.console import Console from rich.progress import Progress, SpinnerColumn, TextColumn + class EnhancedReconModule: - def __init__(self, base_dir: Path = None, ai_analyzer: Any = None, config_path: str = None): + def __init__( + self, base_dir: Path = None, ai_analyzer: Any = None, config_path: str = None + ): """Initialize the reconnaissance module.""" self.config = { "subfinder": { "sources": ["bevigil", "binaryedge", "bufferover", "c99", "censys"], - "timeout": 30 + "timeout": 30, }, "nmap": { "default_flags": ["-sS", "-T4", "--max-retries=1"], "stealth_flags": ["-sS", "-T2", "-f"], - "aggressive_flags": ["-sS", "-T5", "-A"] - }, - "httpx": { - "threads": 50, - "timeout": 10, - "follow_redirects": True + "aggressive_flags": ["-sS", "-T5", "-A"], }, - "nuclei": { - "severity": ["critical", "high", "medium"] - } + "httpx": {"threads": 50, "timeout": 10, "follow_redirects": True}, + "nuclei": {"severity": ["critical", "high", "medium"]}, } - + self.base_dir = base_dir or Path.cwd() self.ai_analyzer = ai_analyzer self.logger = logging.getLogger(__name__) self.console = Console() - + if config_path: try: - with open(config_path, 'r') as f: + with open(config_path, "r") as f: user_config = json.load(f) # Deep merge user config with defaults self.config = self._deep_merge(self.config, user_config) except Exception as e: self.logger.warning("Failed to load config from %s: %s", config_path, e) self.config = self._get_default_config() - + def _deep_merge(self, default, user): """Deep merge two dictionaries.""" result = default.copy() for key, value in user.items(): - if key in result and isinstance(result[key], dict) and isinstance(value, dict): + if ( + key in result + and isinstance(result[key], dict) + and isinstance(value, dict) + ): result[key] = self._deep_merge(result[key], value) else: result[key] = value return result - + async def run_command(self, cmd: List[str], timeout: int = 300) -> str: """Run shell command asynchronously""" try: process = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE + *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await asyncio.wait_for( + process.communicate(), timeout=timeout ) - stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout) return stdout.decode().strip() except asyncio.TimeoutError: self.logger.error("Command timed out: %s", " ".join(cmd)) @@ -82,63 +83,81 @@ async def run_command(self, cmd: List[str], timeout: int = 300) -> str: except Exception as e: self.logger.error("Error running command: %s", e) return "" - + async def discover_subdomains(self, domain: str) -> List[str]: """Discover subdomains using subfinder""" self.console.print("[bold blue]Discovering subdomains...[/bold blue]") - + # Use all available sources instead of restricting to specific ones # This allows subfinder to use passive sources that don't require API keys cmd = ["subfinder", "-d", domain, "-silent", "-all"] - + output = await self.run_command(cmd) if not output: - self.logger.warning("No output from subfinder for %s. Check if subfinder is installed and configured correctly.", domain) - self.console.print("Warning: No output from subfinder for %s. Check if subfinder is installed and configured correctly." % domain) + self.logger.warning( + "No output from subfinder for %s. Check if subfinder is installed and configured correctly.", + domain, + ) + self.console.print( + "Warning: No output from subfinder for %s. Check if subfinder is installed and configured correctly." + % domain + ) subdomains = [line.strip() for line in output.splitlines() if line.strip()] self.console.print(f"[green]Found {len(subdomains)} subdomains[/green]") if not subdomains: - self.logger.warning("No subdomains found for %s. Check subfinder installation, network, and API keys.", domain) - self.console.print("Warning: No subdomains found for %s. Check subfinder installation, network, and API keys." % domain) + self.logger.warning( + "No subdomains found for %s. Check subfinder installation, network, and API keys.", + domain, + ) + self.console.print( + "Warning: No subdomains found for %s. Check subfinder installation, network, and API keys." + % domain + ) return subdomains - + async def probe_web_services(self, subdomains: List[str]) -> List[Dict]: """Probe web services using httpx""" self.console.print("[bold blue]Probing web services...[/bold blue]") - + # SECURITY FIX: Use tempfile module instead of hardcoded temp paths # This prevents path traversal attacks and ensures proper cleanup - with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as temp_file: - temp_file.write('\n'.join(subdomains)) + with tempfile.NamedTemporaryFile( + mode="w", suffix=".txt", delete=False + ) as temp_file: + temp_file.write("\n".join(subdomains)) temp_file_path = temp_file.name - + try: # Run httpx cmd = [ "httpx", - "-l", temp_file_path, + "-l", + temp_file_path, "-json", "-status-code", "-title", "-tech-detect", - "-o", "-" + "-o", + "-", ] - + output = await self.run_command(cmd) services = [] - + for line in output.splitlines(): try: service = json.loads(line) - services.append({ - "url": service.get("url", ""), - "status_code": service.get("status-code", 0), - "title": service.get("title", ""), - "technologies": service.get("technologies", []) - }) + services.append( + { + "url": service.get("url", ""), + "status_code": service.get("status-code", 0), + "title": service.get("title", ""), + "technologies": service.get("technologies", []), + } + ) except json.JSONDecodeError: continue - + self.console.print(f"[green]Found {len(services)} web services[/green]") return services finally: @@ -147,43 +166,51 @@ async def probe_web_services(self, subdomains: List[str]) -> List[Dict]: os.unlink(temp_file_path) except OSError: pass # File may already be deleted - + async def scan_vulnerabilities(self, web_services: List[Dict]) -> List[Dict]: """Scan web services for vulnerabilities using nuclei""" self.console.print("[bold blue]Scanning for vulnerabilities...[/bold blue]") - + # SECURITY FIX: Use tempfile module instead of hardcoded temp paths - with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as temp_file: - temp_file.write('\n'.join(service["url"] for service in web_services)) + with tempfile.NamedTemporaryFile( + mode="w", suffix=".txt", delete=False + ) as temp_file: + temp_file.write("\n".join(service["url"] for service in web_services)) temp_file_path = temp_file.name - + try: # Run nuclei cmd = [ "nuclei", - "-l", temp_file_path, + "-l", + temp_file_path, "-json", - "-severity", ",".join(self.config["nuclei"]["severity"]), - "-silent" + "-severity", + ",".join(self.config["nuclei"]["severity"]), + "-silent", ] - + output = await self.run_command(cmd) vulnerabilities = [] - + for line in output.splitlines(): try: vuln = json.loads(line) - vulnerabilities.append({ - "url": vuln.get("url", ""), - "type": vuln.get("type", ""), - "severity": vuln.get("severity", ""), - "description": vuln.get("description", ""), - "template": vuln.get("template", "") - }) + vulnerabilities.append( + { + "url": vuln.get("url", ""), + "type": vuln.get("type", ""), + "severity": vuln.get("severity", ""), + "description": vuln.get("description", ""), + "template": vuln.get("template", ""), + } + ) except json.JSONDecodeError: continue - - self.console.print(f"[green]Found {len(vulnerabilities)} potential vulnerabilities[/green]") + + self.console.print( + f"[green]Found {len(vulnerabilities)} potential vulnerabilities[/green]" + ) return vulnerabilities finally: # SECURITY FIX: Ensure temp file is always cleaned up @@ -191,30 +218,36 @@ async def scan_vulnerabilities(self, web_services: List[Dict]) -> List[Dict]: os.unlink(temp_file_path) except OSError: pass # File may already be deleted - - async def run_recon(self, target: str, output_dir: Optional[Path] = None) -> Dict[str, Any]: + + async def run_recon( + self, target: str, output_dir: Optional[Path] = None + ) -> Dict[str, Any]: """Run comprehensive reconnaissance on target""" - self.console.print(f"[bold blue]Starting reconnaissance on {target}[/bold blue]") - + self.console.print( + f"[bold blue]Starting reconnaissance on {target}[/bold blue]" + ) + # SECURITY FIX: Ensure output_dir is a Path object if output_dir is None: output_dir = self.base_dir / "recon_results" / target.replace(".", "_") elif isinstance(output_dir, str): output_dir = Path(output_dir) - + # Create output directory output_dir.mkdir(parents=True, exist_ok=True) - + # Run reconnaissance tasks sequentially for now # SECURITY FIX: Fixed async/await issues and added missing methods try: subdomains = await self.discover_subdomains(target) web_services = await self.probe_web_services(subdomains) vulnerabilities = await self.scan_vulnerabilities(web_services) - + # Generate AI analysis - ai_analysis = await self._analyze_results(target, subdomains, web_services, vulnerabilities) - + ai_analysis = await self._analyze_results( + target, subdomains, web_services, vulnerabilities + ) + # Prepare results results = { "target": target, @@ -222,14 +255,14 @@ async def run_recon(self, target: str, output_dir: Optional[Path] = None) -> Dic "subdomains": subdomains, "web_services": web_services, "vulnerabilities": vulnerabilities, - "ai_analysis": ai_analysis + "ai_analysis": ai_analysis, } - + # Save results await self._save_results(results, output_dir) - + return results - + except Exception as e: self.logger.error("Error during reconnaissance: %s", e) return { @@ -239,14 +272,19 @@ async def run_recon(self, target: str, output_dir: Optional[Path] = None) -> Dic "subdomains": [], "web_services": [], "vulnerabilities": [], - "ai_analysis": {} + "ai_analysis": {}, } - - async def _analyze_results(self, target: str, subdomains: List[str], - web_services: List[Dict], vulnerabilities: List[Dict]) -> Dict: + + async def _analyze_results( + self, + target: str, + subdomains: List[str], + web_services: List[Dict], + vulnerabilities: List[Dict], + ) -> Dict: """Analyze results using AI""" self.console.print("[bold blue]Analyzing results with AI...[/bold blue]") - + # Prepare data for AI analysis analysis_data = { "target": target, @@ -254,16 +292,18 @@ async def _analyze_results(self, target: str, subdomains: List[str], "web_service_count": len(web_services), "vulnerability_count": len(vulnerabilities), "critical_findings": [], - "recommendations": [] + "recommendations": [], } - + # Analyze vulnerabilities if vulnerabilities: - vuln_summary = "\n".join([ - f"- {v['type']} ({v['severity']}): {v['description']}" - for v in vulnerabilities - ]) - + vuln_summary = "\n".join( + [ + f"- {v['type']} ({v['severity']}): {v['description']}" + for v in vulnerabilities + ] + ) + prompt = f""" Analyze these security findings for {target}: @@ -274,7 +314,7 @@ async def _analyze_results(self, target: str, subdomains: List[str], 2. Recommended next steps for exploitation 3. Potential attack chains """ - + ai_response = self.ai_analyzer.ollama.generate(prompt) if ai_response: try: @@ -283,24 +323,25 @@ async def _analyze_results(self, target: str, subdomains: List[str], except: # If JSON parsing fails, use raw response analysis_data["raw_analysis"] = ai_response - + return analysis_data - + async def _save_results(self, results: Dict, output_dir: Path): """Save results in multiple formats""" # Save JSON json_path = output_dir / "results.json" try: - async with aiofiles.open(json_path, 'w') as f: + async with aiofiles.open(json_path, "w") as f: await f.write(json.dumps(results, indent=2)) except (aiofiles.OSError, aiofiles.IOError) as e: self.logger.error("Error writing results: %s", e) - + # Generate Markdown report md_path = output_dir / "report.md" try: - async with aiofiles.open(md_path, 'w') as f: - await f.write(f"""# VulnForge Reconnaissance Report + async with aiofiles.open(md_path, "w") as f: + await f.write( + f"""# VulnForge Reconnaissance Report ## Target: {results['target']} ## Scan Time: {results['timestamp']} @@ -321,8 +362,9 @@ async def _save_results(self, results: Dict, output_dir: Path): ### AI Analysis {json.dumps(results['ai_analysis'], indent=2)} -""") +""" + ) except (aiofiles.OSError, aiofiles.IOError) as e: self.logger.error("Error writing results: %s", e) - - self.console.print(f"[green]Results saved to: {output_dir}[/green]") \ No newline at end of file + + self.console.print(f"[green]Results saved to: {output_dir}[/green]")