From 609c20987db03861923020a777449703a7532f92 Mon Sep 17 00:00:00 2001 From: "deepsource-autofix[bot]" <62050782+deepsource-autofix[bot]@users.noreply.github.com> Date: Fri, 5 Dec 2025 14:56:50 +0000 Subject: [PATCH] style: format code with Black This commit fixes the style issues introduced in e4e873b according to the output from Black. Details: None --- ai_controller.py | 72 ++++++++------ ai_integration.py | 236 +++++++++++++++++++++++++++------------------- conftest.py | 1 - utils/notifier.py | 98 ++++++++++--------- vulnforge_main.py | 103 +++++++++++++------- 5 files changed, 299 insertions(+), 211 deletions(-) diff --git a/ai_controller.py b/ai_controller.py index 43858aa..fa0d495 100644 --- a/ai_controller.py +++ b/ai_controller.py @@ -18,12 +18,13 @@ from utils.context_builder import ContextBuilder from utils.notifier import Notifier + class AIController: """Controls AI operations and decision making.""" def __init__(self, session_dir: str, config_path: str): """Initialize the AI controller. - + Args: session_dir: Directory to store session data config_path: Path to configuration file @@ -35,7 +36,7 @@ def __init__(self, session_dir: str, config_path: str): self.notifier = Notifier(session_dir, config_path) self.report_generator = ReportGenerator(self.session_dir) self.output_format = "all" # Default output format - + # Initialize session data self.session_data = { "start_time": datetime.now().isoformat(), @@ -44,16 +45,18 @@ def __init__(self, session_dir: str, config_path: str): "findings": [], "execution_history": [], "ai_decisions": [], - "errors": [] + "errors": [], } - async def process_query(self, query: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + async def process_query( + self, query: str, context: Optional[Dict[str, Any]] = None + ) -> Dict[str, Any]: """Process an AI query. - + Args: query: The query to process context: Optional context data - + Returns: Dictionary containing the response and metadata """ @@ -62,13 +65,13 @@ async def process_query(self, query: str, context: Optional[Dict[str, Any]] = No raise ValueError("Query must be a non-empty string") self.logger.info("Processing AI query: %s", query) - + try: # Prepare context context_str = "" if context: context_str = json.dumps(context, indent=2) - + # Create prompt prompt = f"""You are a security expert. Please provide guidance on the following query: @@ -85,30 +88,30 @@ async def process_query(self, query: str, context: Optional[Dict[str, Any]] = No 5. Safety considerations Provide a detailed response:""" - + # Get AI response response = await self._get_ai_response(prompt) - + # Log response self.logger.info("AI response received") - + return { "answer": response, # Return an empty log list as the logger doesn't expose in-memory logs "logs": [], - "prompt": prompt + "prompt": prompt, } - + except Exception as e: self.logger.error("Error processing query: %s", str(e)) raise async def _get_ai_response(self, prompt: str) -> str: """Get response from AI model. - + Args: prompt: The prompt to send to the AI - + Returns: The AI's response """ @@ -116,19 +119,19 @@ async def _get_ai_response(self, prompt: str) -> str: # TODO: Implement actual AI model call # For now, return a placeholder response return "This is a placeholder response. AI integration pending." - + except Exception as e: self.logger.error("Error getting AI response: %s", str(e)) raise def _generate_report(self) -> Dict[str, str]: """Generate reports for the current session. - + Returns: Dictionary mapping report types to their file paths """ self.logger.info("Generating reports in %s format...", self.output_format) - + try: # Prepare context data context = { @@ -136,7 +139,7 @@ def _generate_report(self) -> Dict[str, str]: "domain": self.session_data.get("target_domain"), "ip": self.session_data.get("target_ip"), "scan_time": self.session_data.get("start_time"), - "duration": self._calculate_duration() + "duration": self._calculate_duration(), }, "modules": self.session_data["modules"], "tools": self.session_data["tools"], @@ -144,45 +147,50 @@ def _generate_report(self) -> Dict[str, str]: "exploits": self.session_data.get("execution_history", []), "defensive_measures": self.session_data.get("defensive_measures", []), "recommendations": self.session_data.get("recommendations", []), - "errors": self.session_data["errors"] + "errors": self.session_data["errors"], } - + # Generate reports - report_paths = self.report_generator.generate_reports(context, self.output_format) - + report_paths = self.report_generator.generate_reports( + context, self.output_format + ) + self.logger.info("Reports generated successfully") return report_paths - + except Exception as e: self.logger.error("Error generating reports: %s", str(e)) raise def _calculate_duration(self) -> str: """Calculate session duration. - + Returns: Formatted duration string """ start_time = datetime.fromisoformat(self.session_data["start_time"]) end_time = datetime.now() duration = end_time - start_time - + hours = duration.seconds // 3600 minutes = (duration.seconds % 3600) // 60 seconds = duration.seconds % 60 - + return f"{hours:02d}:{minutes:02d}:{seconds:02d}" def setup_ai(self) -> bool: """Setup AI environment: check Ollama service and model availability.""" try: - if hasattr(self, 'ollama'): + if hasattr(self, "ollama"): client = self.ollama else: from ai_integration import OllamaClient + client = OllamaClient() if not client.is_available(): - self.logger.error("Ollama service not available. Start with: ollama serve") + self.logger.error( + "Ollama service not available. Start with: ollama serve" + ) return False models = client.list_models() if not models: @@ -190,8 +198,10 @@ def setup_ai(self) -> bool: if not client.pull_model(client.main_model): self.logger.error("Failed to pull default model") return False - self.logger.info("AI setup complete. Available models: %s", [m['name'] for m in models]) + self.logger.info( + "AI setup complete. Available models: %s", [m["name"] for m in models] + ) return True except Exception as e: self.logger.error("Error in setup_ai: %s", e) - return False \ No newline at end of file + return False diff --git a/ai_integration.py b/ai_integration.py index 1ab8678..a3f5b4e 100755 --- a/ai_integration.py +++ b/ai_integration.py @@ -16,21 +16,22 @@ import asyncio import ctypes + class OllamaClient: def __init__(self, base_url: str = "http://localhost:11434"): self.base_url = base_url self.logger = logging.getLogger(__name__) - + # Load configuration from environment - self.main_model = os.getenv("OLLAMA_MAIN_MODEL", "deepseek-coder-v2:16b-lite-base-q4_0") - self.assistant_model = os.getenv("OLLAMA_ASSISTANT_MODEL", "mistral:7b-instruct-v0.2-q4_0") - - self.backup_models = [ - "deepseek-coder:6.7b", - "codellama:7b", - "mistral:7b" - ] - + self.main_model = os.getenv( + "OLLAMA_MAIN_MODEL", "deepseek-coder-v2:16b-lite-base-q4_0" + ) + self.assistant_model = os.getenv( + "OLLAMA_ASSISTANT_MODEL", "mistral:7b-instruct-v0.2-q4_0" + ) + + self.backup_models = ["deepseek-coder:6.7b", "codellama:7b", "mistral:7b"] + def is_available(self) -> bool: """Check if Ollama service is running""" try: @@ -38,97 +39,111 @@ def is_available(self) -> bool: return response.status_code == 200 except (requests.RequestException, requests.Timeout, requests.ConnectionError): return False - + def list_models(self) -> List[Dict]: """List available models""" try: response = requests.get(f"{self.base_url}/api/tags") if response.status_code == 200: - return response.json().get('models', []) - except (requests.RequestException, requests.Timeout, requests.ConnectionError) as e: + return response.json().get("models", []) + except ( + requests.RequestException, + requests.Timeout, + requests.ConnectionError, + ) as e: self.logger.error("Error listing models: %s", e) return [] - + def pull_model(self, model: str) -> bool: """Pull a model if not available""" try: self.logger.info("Pulling model: %s", model) data = {"name": model} - response = requests.post(f"{self.base_url}/api/pull", json=data, stream=True) - + response = requests.post( + f"{self.base_url}/api/pull", json=data, stream=True + ) + for line in response.iter_lines(): if line: try: - status = json.loads(line.decode('utf-8')) - if status.get('status') == 'success': + status = json.loads(line.decode("utf-8")) + if status.get("status") == "success": return True except: continue except Exception as e: self.logger.error("Error pulling model %s: %s", model, e) return False - - def generate(self, prompt: str, model: str = None, system_prompt: str = None) -> Optional[str]: + + def generate( + self, prompt: str, model: str = None, system_prompt: str = None + ) -> Optional[str]: """Generate text using Ollama""" if not model: model = self.get_best_model() - + if not model: self.logger.error("No suitable model available") return None - + try: data = { "model": model, "prompt": prompt, "stream": False, "options": { - "temperature": 0.5, # Lowered for more deterministic planning + "temperature": 0.5, # Lowered for more deterministic planning "top_p": 0.9, "max_tokens": 4096, - "num_ctx": 16384, # Increased context window for large prompts + "num_ctx": 16384, # Increased context window for large prompts "num_thread": 8, - "repeat_penalty": 1.1 - } + "repeat_penalty": 1.1, + }, } - + if system_prompt: data["system"] = system_prompt - - response = requests.post(f"{self.base_url}/api/generate", json=data, timeout=300) # Increased timeout - + + response = requests.post( + f"{self.base_url}/api/generate", json=data, timeout=300 + ) # Increased timeout + if response.status_code == 200: result = response.json() - return result.get('response', '').strip() + return result.get("response", "").strip() else: self.logger.error("Ollama API error: %s", response.status_code) - - except (requests.RequestException, requests.Timeout, requests.ConnectionError) as e: + + except ( + requests.RequestException, + requests.Timeout, + requests.ConnectionError, + ) as e: self.logger.error("Error generating with Ollama: %s", e) - + return None - + def get_best_model(self) -> Optional[str]: """Get the best available model""" - available_models = [m['name'] for m in self.list_models()] - + available_models = [m["name"] for m in self.list_models()] + # Check main model first (deepseek-coder-v2:16b-lite-base-q4_0) if self.main_model in available_models: return self.main_model - + # Check assistant model next (mistral:7b-instruct-v0.2-q4_0) if self.assistant_model in available_models: return self.assistant_model - + # Check backup models for model in self.backup_models: if model in available_models: return model - + # If no preferred models, return first available if available_models: return available_models[0] - + return None @@ -148,20 +163,23 @@ def get_best_model(self) -> Optional[str]: c_parser = None C_PARSER_AVAILABLE = False import logging - logging.getLogger(__name__).warning("C parser library not found. Using Python fallback.") + + logging.getLogger(__name__).warning( + "C parser library not found. Using Python fallback." + ) class AIAnalyzer: def __init__(self, ollama_client: OllamaClient): self.ollama = ollama_client self.logger = logging.getLogger(__name__) - + def analyze_nmap_output(self, nmap_output: str) -> Dict[str, Any]: """Analyze nmap scan results using AI""" system_prompt = """You are a cybersecurity expert analyzing nmap scan results. Identify potential vulnerabilities, interesting services, and security issues. Provide structured analysis in JSON format with severity levels.""" - + prompt = f""" Analyze this nmap scan output and identify potential security issues: @@ -184,28 +202,28 @@ def analyze_nmap_output(self, nmap_output: str) -> Dict[str, Any]: "next_steps": ["recommended follow-up actions"] }} """ - + response = self.ollama.generate(prompt, system_prompt=system_prompt) if response: try: return json.loads(response) except json.JSONDecodeError: # Extract JSON if wrapped in markdown - json_match = re.search(r'```json\n(.*?)\n```', response, re.DOTALL) + json_match = re.search(r"```json\n(.*?)\n```", response, re.DOTALL) if json_match: try: return json.loads(json_match.group(1)) except json.JSONDecodeError: pass - + return {"error": "Failed to analyze nmap output", "raw_response": response} - + def generate_exploit_code(self, vulnerability_info: Dict) -> str: """Generate exploit code based on vulnerability information""" system_prompt = """You are a security researcher creating proof-of-concept exploit code. Generate safe, educational exploit code with proper error handling and comments. Include safety warnings and ethical use disclaimers.""" - + prompt = f""" Generate a Python proof-of-concept exploit for this vulnerability: @@ -224,18 +242,18 @@ def generate_exploit_code(self, vulnerability_info: Dict) -> str: Generate complete, working Python code: """ - + response = self.ollama.generate(prompt, system_prompt=system_prompt) return response or "# Failed to generate exploit code" - + def analyze_web_response(self, url: str, response_data: Dict) -> Dict[str, Any]: """Analyze web service response for vulnerabilities""" system_prompt = """You are a web application security expert. Analyze HTTP responses for potential vulnerabilities and security issues.""" - - headers = response_data.get('headers', {}) - content = response_data.get('content', '')[:2000] # Limit content length - + + headers = response_data.get("headers", {}) + content = response_data.get("content", "")[:2000] # Limit content length + prompt = f""" Analyze this web service for security issues: @@ -270,26 +288,28 @@ def analyze_web_response(self, url: str, response_data: Dict) -> Dict[str, Any]: "recommendations": ["security recommendations"] }} """ - + response = self.ollama.generate(prompt, system_prompt=system_prompt) if response: try: return json.loads(response) except json.JSONDecodeError: - json_match = re.search(r'```json\n(.*?)\n```', response, re.DOTALL) + json_match = re.search(r"```json\n(.*?)\n```", response, re.DOTALL) if json_match: try: return json.loads(json_match.group(1)) except json.JSONDecodeError: pass - + return {"error": "Failed to analyze web response", "raw_response": response} - - def fix_broken_tool(self, tool_name: str, error_output: str, source_code: str = None) -> str: + + def fix_broken_tool( + self, tool_name: str, error_output: str, source_code: str = None + ) -> str: """Generate fixes for broken security tools""" system_prompt = """You are a DevOps engineer specializing in fixing broken security tools. Analyze errors and provide working solutions.""" - + prompt = f""" This security tool is broken and needs fixing: @@ -311,15 +331,15 @@ def fix_broken_tool(self, tool_name: str, error_output: str, source_code: str = - Missing environment variables - Network/permission issues """ - + response = self.ollama.generate(prompt, system_prompt=system_prompt) return response or "# Failed to generate fix" - + def prioritize_vulnerabilities(self, vulnerabilities: List[Dict]) -> List[Dict]: """Use AI to prioritize vulnerabilities by exploitability and impact""" system_prompt = """You are a penetration tester prioritizing vulnerabilities. Rank vulnerabilities by exploitability and business impact.""" - + prompt = f""" Prioritize these vulnerabilities for testing: @@ -346,35 +366,42 @@ def prioritize_vulnerabilities(self, vulnerabilities: List[Dict]) -> List[Dict]: ] }} """ - + response = self.ollama.generate(prompt, system_prompt=system_prompt) if response: try: result = json.loads(response) - return result.get('prioritized_vulnerabilities', vulnerabilities) + return result.get("prioritized_vulnerabilities", vulnerabilities) except: pass - + return vulnerabilities def analyze_nuclei_output(self, nuclei_json_output: str) -> Dict[str, Any]: """Analyze nuclei output using the high-speed C parser.""" - + if C_PARSER_AVAILABLE and c_parser: # Use the high-speed C parser when available - raw_summary = c_parser.parse_nuclei_output(nuclei_json_output.encode('utf-8')) - summary_str = raw_summary.decode('utf-8') + raw_summary = c_parser.parse_nuclei_output( + nuclei_json_output.encode("utf-8") + ) + summary_str = raw_summary.decode("utf-8") else: # SECURITY FIX: Fallback to pure Python implementation # This ensures the application works even without the native library try: import json + data = json.loads(nuclei_json_output) - critical_count = sum(1 for item in data if item.get('info', {}).get('severity') == 'critical') + critical_count = sum( + 1 + for item in data + if item.get("info", {}).get("severity") == "critical" + ) summary_str = json.dumps({"critical_findings": critical_count}) except (json.JSONDecodeError, TypeError): summary_str = '{"error": "Failed to parse nuclei output"}' - + try: return json.loads(summary_str) except json.JSONDecodeError: @@ -386,6 +413,7 @@ class AIOrchestrator: Manages a multi-step AI reasoning pipeline for complex security tasks. It chains specialized prompts for planning, tool selection, and execution. """ + def __init__(self, prompt_dir: Path): self.prompt_dir = prompt_dir self.ollama = OllamaClient() @@ -400,7 +428,9 @@ def _load_prompts(self): with open(self.prompt_dir / "Devin AI/system.md", "r") as f: prompts["planner"] = f.read() # Manus-style tool selection prompt - with open(self.prompt_dir / "Manus Agent Tools & Prompt/system.md", "r") as f: + with open( + self.prompt_dir / "Manus Agent Tools & Prompt/system.md", "r" + ) as f: prompts["tool_selector"] = f.read() # Cursor-style code/analysis prompt with open(self.prompt_dir / "Cursor Prompts/prompts.md", "r") as f: @@ -415,25 +445,25 @@ def execute_task(self, task_description: str): Executes a full task pipeline: Plan -> Select Tool -> Execute -> Analyze. """ print("--- AI Task Pipeline Initiated ---") - + # 1. Planning Phase (using Devin's prompt) plan = self._planning_phase(task_description) - self.state['plan'] = plan + self.state["plan"] = plan print(f"Phase 1: Plan Created -> {plan}") # 2. Tool Selection Phase (using Manus' prompt) tool_command = self._tool_selection_phase(task_description, plan) - self.state['tool_command'] = tool_command + self.state["tool_command"] = tool_command print(f"Phase 2: Tool Selected -> {tool_command}") # 3. Execution Phase (simulated) execution_result = self._execution_phase(tool_command) - self.state['execution_result'] = execution_result + self.state["execution_result"] = execution_result print(f"Phase 3: Execution Result -> {execution_result[:100]}...") # 4. Analysis Phase (using Cursor's prompt) analysis = self._analysis_phase(execution_result) - self.state['analysis'] = analysis + self.state["analysis"] = analysis print(f"Phase 4: Analysis Complete -> {analysis}") print("--- AI Task Pipeline Complete ---") @@ -441,18 +471,18 @@ def execute_task(self, task_description: str): def _planning_phase(self, task: str) -> str: """Uses the 'planner' prompt to create a high-level strategy.""" - system_prompt = self.prompts['planner'] + system_prompt = self.prompts["planner"] user_prompt = f"Create a step-by-step plan for the following task: {task}" response = self.ollama.generate(user_prompt, system_prompt=system_prompt) return response def _tool_selection_phase(self, task: str, plan: str) -> str: """Uses the 'tool_selector' prompt to choose the right command.""" - system_prompt = self.prompts['tool_selector'] + system_prompt = self.prompts["tool_selector"] user_prompt = f"Given the task '{task}' and the plan '{plan}', what is the exact shell command to execute next? Only output the command." response = self.ollama.generate(user_prompt, system_prompt=system_prompt) return response - + def _execution_phase(self, command: str) -> str: """Simulates running the command and returns mock output.""" print(f"Simulating execution of: `{command}`") @@ -462,7 +492,7 @@ def _execution_phase(self, command: str) -> str: def _analysis_phase(self, result: str) -> str: """Uses the 'analyst' prompt to interpret the results.""" - system_prompt = self.prompts['analyst'] + system_prompt = self.prompts["analyst"] user_prompt = f"Analyze the following tool output and provide a summary of key findings and recommendations:\n\n{result}" response = self.ollama.generate(user_prompt, system_prompt=system_prompt) return response @@ -471,43 +501,51 @@ def _analysis_phase(self, result: str) -> str: # CLI interface for AI module if __name__ == "__main__": import argparse - + parser = argparse.ArgumentParser(description="VulnForge AI Module") - parser.add_argument("--test-connection", action="store_true", help="Test Ollama connection") + parser.add_argument( + "--test-connection", action="store_true", help="Test Ollama connection" + ) parser.add_argument("--pull-model", help="Pull a specific model") - parser.add_argument("--list-models", action="store_true", help="List available models") + parser.add_argument( + "--list-models", action="store_true", help="List available models" + ) parser.add_argument("--analyze-nmap", help="Analyze nmap output file") parser.add_argument( - "--ai-pipeline", action="store_true", help="Enable the advanced multi-prompt AI pipeline." + "--ai-pipeline", + action="store_true", + help="Enable the advanced multi-prompt AI pipeline.", ) parser.add_argument( - "--prompt-dir", help="Directory for the AI pipeline prompts.", default="AI_Propmt/system-prompts-and-models-of-ai-tools" + "--prompt-dir", + help="Directory for the AI pipeline prompts.", + default="AI_Propmt/system-prompts-and-models-of-ai-tools", ) - + args = parser.parse_args() - + orchestrator = AIOrchestrator(Path(args.prompt_dir)) - + if args.test_connection: if orchestrator.ollama.is_available(): print("✓ AI system ready") else: print("✗ AI system not available") - + elif args.pull_model: if orchestrator.ollama.pull_model(args.pull_model): print(f"✓ Model {args.pull_model} pulled successfully") else: print(f"✗ Failed to pull model {args.pull_model}") - + elif args.list_models: models = orchestrator.ollama.list_models() print("Available models:") for model in models: print(f" - {model['name']}") - + elif args.analyze_nmap: - with open(args.analyze_nmap, 'r') as f: + with open(args.analyze_nmap, "r") as f: content = f.read() result = orchestrator.analyzer.analyze_nmap_output(content) print(json.dumps(result, indent=2)) @@ -515,11 +553,13 @@ def _analysis_phase(self, result: str) -> str: # Handle AI Pipeline Mode if args.ai_pipeline: if not args.target: - print("Error: A target is required for AI pipeline mode, e.g., --target 'scan example.com'") - + print( + "Error: A target is required for AI pipeline mode, e.g., --target 'scan example.com'" + ) + prompt_path = Path(args.prompt_dir) if not prompt_path.exists(): print(f"Error: Prompt directory not found at '{prompt_path}'") - + orchestrator = AIOrchestrator(prompt_path) - orchestrator.execute_task(f"Perform a security scan on {args.target}") \ No newline at end of file + orchestrator.execute_task(f"Perform a security scan on {args.target}") diff --git a/conftest.py b/conftest.py index a76b773..e6622b2 100644 --- a/conftest.py +++ b/conftest.py @@ -1,4 +1,3 @@ - import os import sys diff --git a/utils/notifier.py b/utils/notifier.py index aa33944..70d83c9 100755 --- a/utils/notifier.py +++ b/utils/notifier.py @@ -14,16 +14,17 @@ from typing import Dict, List, Optional, Any from datetime import datetime, timezone + class Notifier: """Handles notifications for various channels""" - + def __init__(self, base_dir: str, config_path: str): """Initialize notifier with configuration""" self.base_dir = Path(base_dir) self.config_path = Path(config_path) self.logger = logging.getLogger(__name__) self.config = self._load_config() - + def _load_config(self) -> Dict: """Load notification configuration""" try: @@ -32,46 +33,44 @@ def _load_config(self) -> Dict: except Exception as e: self.logger.error(f"Failed to load config: {e}") return {"notifications": {"enabled": False}} - + async def notify( self, message: str, severity: str = "info", data: Optional[Dict] = None, - channels: Optional[List[str]] = None + channels: Optional[List[str]] = None, ) -> None: """Send notification to specified channels""" if not self.config["notifications"]["enabled"]: return - + if channels is None: channels = [] - + await self._process_notification(message, severity, data, channels) - + async def _process_notification( - self, - message: str, - severity: str, - data: Optional[Dict], - channels: List[str] + self, message: str, severity: str, data: Optional[Dict], channels: List[str] ) -> None: """Process notification for each channel""" tasks = [] - + if "email" in channels and self.config["notifications"]["email"]["enabled"]: tasks.append(self._send_email(message, severity, data)) - + if "discord" in channels and self.config["notifications"]["discord"]["enabled"]: tasks.append(self._send_discord(message, severity, data)) - + if "webhook" in channels and self.config["notifications"]["webhook"]["enabled"]: tasks.append(self._send_webhook(message, severity, data)) - + if tasks: await asyncio.gather(*tasks, return_exceptions=True) - - async def _send_email(self, message: str, severity: str, data: Optional[Dict]) -> None: + + async def _send_email( + self, message: str, severity: str, data: Optional[Dict] + ) -> None: """Send email notification""" try: email_config = self.config["notifications"]["email"] @@ -79,77 +78,84 @@ async def _send_email(self, message: str, severity: str, data: Optional[Dict]) - msg["From"] = email_config["username"] msg["To"] = email_config["username"] msg["Subject"] = f"VulnForge Alert: {severity.upper()}" - + body = f"Message: {message}\nSeverity: {severity}\n" if data: body += f"\nAdditional Data:\n{json.dumps(data, indent=2)}" - + msg.attach(MIMEText(body, "plain")) - - with smtplib.SMTP(email_config["smtp_server"], email_config["smtp_port"]) as server: + + with smtplib.SMTP( + email_config["smtp_server"], email_config["smtp_port"] + ) as server: server.starttls() server.login(email_config["username"], email_config["password"]) server.send_message(msg) - + except Exception as e: self.logger.error(f"Failed to send email: {e}") - - async def _send_discord(self, message: str, severity: str, data: Optional[Dict]) -> None: + + async def _send_discord( + self, message: str, severity: str, data: Optional[Dict] + ) -> None: """Send Discord notification""" try: webhook_url = self.config["notifications"]["discord"]["webhook_url"] color = self._get_severity_color(severity) - + embed = { "title": f"VulnForge Alert: {severity.upper()}", "description": message, "color": color, - "timestamp": datetime.now(timezone.utc).isoformat() # Changed datetime.utcnow() to datetime.now(UTC) + "timestamp": datetime.now( + timezone.utc + ).isoformat(), # Changed datetime.utcnow() to datetime.now(UTC) } - + if data: embed["fields"] = [ {"name": k, "value": str(v), "inline": True} for k, v in data.items() ] - + async with aiohttp.ClientSession() as session: - response = await session.post( - webhook_url, - json={"embeds": [embed]} - ) + response = await session.post(webhook_url, json={"embeds": [embed]}) await response.text() - + except Exception as e: self.logger.error(f"Failed to send Discord notification: {e}") - - async def _send_webhook(self, message: str, severity: str, data: Optional[Dict]) -> None: + + async def _send_webhook( + self, message: str, severity: str, data: Optional[Dict] + ) -> None: """Send generic webhook notification""" try: webhook_url = self.config["notifications"]["webhook"]["url"] payload = { "message": message, "severity": severity, - "timestamp": datetime.now(timezone.utc).isoformat() # Changed datetime.utcnow() to datetime.now(UTC) + "timestamp": datetime.now( + timezone.utc + ).isoformat(), # Changed datetime.utcnow() to datetime.now(UTC) } - + if data: payload["data"] = data - + async with aiohttp.ClientSession() as session: response = await session.post(webhook_url, json=payload) await response.text() - + except Exception as e: self.logger.error(f"Failed to send webhook notification: {e}") - + def _get_severity_color(self, severity: str) -> int: """Get color code for severity level""" colors = { "critical": 0xFF0000, # Red - "high": 0xFFA500, # Orange - "medium": 0xFFFF00, # Yellow - "low": 0x00FF00, # Green - "info": 0x0000FF # Blue + "high": 0xFFA500, # Orange + "medium": 0xFFFF00, # Yellow + "low": 0x00FF00, # Green + "info": 0x0000FF, # Blue } - return colors.get(severity.lower(), 0x808080) # Default to gray \ No newline at end of file + return colors.get(severity.lower(), 0x808080) # Default to gray diff --git a/vulnforge_main.py b/vulnforge_main.py index ce7792b..2d6c7a2 100755 --- a/vulnforge_main.py +++ b/vulnforge_main.py @@ -27,7 +27,7 @@ from recon_module import EnhancedReconModule from ai_integration import AIAnalyzer, OllamaClient -from ai_orchestrator import AIOrchestrator # New Import +from ai_orchestrator import AIOrchestrator # New Import from modules.darkweb import ( run_darkweb_osint, ROBIN_DEFAULT_MODEL, @@ -135,8 +135,12 @@ def install_missing_tools(self): self.logger.info("Installing %s...", tool) try: # SECURITY FIX: Use full executable path and handle subprocess failures - result = subprocess.run(["/usr/bin/go", "install", package], - capture_output=True, text=True, check=True) + result = subprocess.run( + ["/usr/bin/go", "install", package], + capture_output=True, + text=True, + check=True, + ) self.logger.info("Successfully installed %s", tool) except subprocess.CalledProcessError as e: self.logger.error("Failed to install %s: %s", tool, e) @@ -150,7 +154,9 @@ def install_missing_tools(self): async def run_recon(self, target: str, output_dir: Optional[Path] = None): """Run reconnaissance on target""" - recon = EnhancedReconModule(self.base_dir, self.ai_analyzer, config_path="configs/tools.json") + recon = EnhancedReconModule( + self.base_dir, self.ai_analyzer, config_path="configs/tools.json" + ) return await recon.run_recon(target, output_dir) def ask_ai(self, question: str): @@ -389,7 +395,7 @@ async def _async_main(): For detailed documentation, visit: https://github.com/Arunking9/VulnForge """, - formatter_class=argparse.RawDescriptionHelpFormatter + formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("--target", "-t", help="Target domain or IP") parser.add_argument( @@ -425,54 +431,78 @@ async def _async_main(): "--stealth", "-s", action="store_true", help="Enable stealth mode" ) parser.add_argument( - "--ai-pipeline", action="store_true", help="Enable the advanced multi-prompt AI pipeline." + "--ai-pipeline", + action="store_true", + help="Enable the advanced multi-prompt AI pipeline.", ) parser.add_argument( - "--prompt-dir", help="Directory for the AI pipeline prompts.", default="AI_Propmt/system-prompts-and-models-of-ai-tools" + "--prompt-dir", + help="Directory for the AI pipeline prompts.", + default="AI_Propmt/system-prompts-and-models-of-ai-tools", ) # Add subparsers for commands - subparsers = parser.add_subparsers(dest='command', help='Available commands') - + subparsers = parser.add_subparsers(dest="command", help="Available commands") + # Ask AI command - ask_ai_parser = subparsers.add_parser('ask-ai', help='Ask the AI assistant a question') - ask_ai_parser.add_argument('question', help='The question to ask the AI') - ask_ai_parser.add_argument('--verbose', action='store_true', help='Show detailed model logs') - ask_ai_parser.add_argument('--dangerous', action='store_true', help='Enable dangerous mode') - ask_ai_parser.add_argument('--confirm-danger', action='store_true', help='Confirm dangerous mode') - + ask_ai_parser = subparsers.add_parser( + "ask-ai", help="Ask the AI assistant a question" + ) + ask_ai_parser.add_argument("question", help="The question to ask the AI") + ask_ai_parser.add_argument( + "--verbose", action="store_true", help="Show detailed model logs" + ) + ask_ai_parser.add_argument( + "--dangerous", action="store_true", help="Enable dangerous mode" + ) + ask_ai_parser.add_argument( + "--confirm-danger", action="store_true", help="Confirm dangerous mode" + ) + # Generate tool command - generate_tool_parser = subparsers.add_parser('generate-tool', help='Generate a custom tool') - generate_tool_parser.add_argument('description', help='Description of the tool to generate') - generate_tool_parser.add_argument('--verbose', action='store_true', help='Show detailed generation logs') - + generate_tool_parser = subparsers.add_parser( + "generate-tool", help="Generate a custom tool" + ) + generate_tool_parser.add_argument( + "description", help="Description of the tool to generate" + ) + generate_tool_parser.add_argument( + "--verbose", action="store_true", help="Show detailed generation logs" + ) + # List tools command - list_tools_parser = subparsers.add_parser('list-tools', help='List all custom tools') - list_tools_parser.add_argument('--verbose', action='store_true', help='Show detailed tool information') + list_tools_parser = subparsers.add_parser( + "list-tools", help="List all custom tools" + ) + list_tools_parser.add_argument( + "--verbose", action="store_true", help="Show detailed tool information" + ) # Dark web OSINT command (Robin integration) darkweb_parser = subparsers.add_parser( - 'darkweb', help='Run the Robin dark web OSINT workflow' + "darkweb", help="Run the Robin dark web OSINT workflow" + ) + darkweb_parser.add_argument( + "--query", "-q", required=True, help="Dark web search query" ) - darkweb_parser.add_argument('--query', '-q', required=True, help='Dark web search query') darkweb_parser.add_argument( - '--model', - '-m', + "--model", + "-m", choices=get_robin_model_choices(), default=ROBIN_DEFAULT_MODEL, - help='LLM model to use for refinement/filtering', + help="LLM model to use for refinement/filtering", ) darkweb_parser.add_argument( - '--threads', - '-t', + "--threads", + "-t", type=int, default=5, - help='Number of concurrent requests for search/scrape', + help="Number of concurrent requests for search/scrape", ) darkweb_parser.add_argument( - '--output', - '-o', - help='Optional output file or directory for the markdown report', + "--output", + "-o", + help="Optional output file or directory for the markdown report", ) args = parser.parse_args() @@ -484,14 +514,16 @@ async def _async_main(): # Handle AI Pipeline Mode if args.ai_pipeline: if not args.target: - print("Error: A target is required for AI pipeline mode, e.g., --target 'scan example.com'") + print( + "Error: A target is required for AI pipeline mode, e.g., --target 'scan example.com'" + ) return - + prompt_path = Path(args.prompt_dir) if not prompt_path.exists(): print(f"Error: Prompt directory not found at '{prompt_path}'") return - + orchestrator = AIOrchestrator(prompt_path) orchestrator.execute_task(f"Perform a security scan on {args.target}") return @@ -625,5 +657,6 @@ def main(): """Synchronous entrypoint for console_scripts.""" asyncio.run(_async_main()) + if __name__ == "__main__": main()