diff --git a/ai_controller.py b/ai_controller.py index cd4f5bc..c40c88a 100644 --- a/ai_controller.py +++ b/ai_controller.py @@ -22,12 +22,13 @@ from utils.context_builder import ContextBuilder from utils.notifier import Notifier + class AIController: """Controls AI operations and decision making.""" def __init__(self, session_dir: str, config_path: str): """Initialize the AI controller. - + Args: session_dir: Directory to store session data config_path: Path to configuration file @@ -39,7 +40,7 @@ def __init__(self, session_dir: str, config_path: str): self.notifier = Notifier(session_dir, config_path) self.report_generator = ReportGenerator(self.session_dir) self.output_format = "all" # Default output format - + # Initialize session data self.session_data = { "start_time": datetime.now().isoformat(), @@ -48,43 +49,48 @@ def __init__(self, session_dir: str, config_path: str): "findings": [], "execution_history": [], "ai_decisions": [], - "errors": [] + "errors": [], } @RateLimiter(max_calls=5, time_window=60) - async def process_query(self, query: str, context: Optional[Dict[str, Any]] = None, - session_id: Optional[str] = None, identifier: str = 'default') -> Dict[str, Any]: + async def process_query( + self, + query: str, + context: Optional[Dict[str, Any]] = None, + session_id: Optional[str] = None, + identifier: str = "default", + ) -> Dict[str, Any]: """Process an AI query with authentication and rate limiting. - + Args: query: The query to process context: Optional context data session_id: Optional session ID for authentication identifier: Rate limit identifier - + Returns: Dictionary containing the response and metadata - + Raises: PermissionError: If authentication fails """ # SECURITY: Validate input if not isinstance(query, str) or not query.strip(): raise ValueError("Query must be a non-empty string") - + # SECURITY: Limit query length to prevent abuse if len(query) > 5000: raise ValueError("Query too long (max 5000 characters)") - + # SECURITY: Sanitize query for logging safe_query = SecurityValidator.sanitize_log_input(query[:100]) self.logger.info("Processing AI query: %s...", safe_query) - + # SECURITY: Check authentication if session_id provided if session_id: auth_manager = get_auth_manager() auth_manager.require_permission(session_id, Permission.AI_QUERY) - + try: # Prepare context context_str = "" @@ -93,7 +99,7 @@ async def process_query(self, query: str, context: Optional[Dict[str, Any]] = No if not isinstance(context, dict): raise ValueError("Context must be a dictionary") context_str = json.dumps(context, indent=2) - + # Create prompt prompt = f"""You are a security expert. Please provide guidance on the following query: @@ -110,20 +116,20 @@ async def process_query(self, query: str, context: Optional[Dict[str, Any]] = No 5. Safety considerations Provide a detailed response:""" - + # Get AI response response = await self._get_ai_response(prompt) - + # Log response self.logger.info("AI response received") - + return { "answer": response, # Return an empty log list as the logger doesn't expose in-memory logs "logs": [], - "prompt": prompt + "prompt": prompt, } - + except ValueError as e: # SECURITY: Don't expose internal errors self.logger.error("Validation error: %s", str(e)) @@ -135,52 +141,71 @@ async def process_query(self, query: str, context: Optional[Dict[str, Any]] = No async def execute_intent(self, intent: Dict[str, Any]) -> Dict[str, Any]: """Execute a structured action intent from the Agentic AI. - + Args: intent: The structured intent JSON. - + Returns: Result of the execution. """ - self.logger.info("Executing intent: %s on %s", intent.get("type"), intent.get("target")) - + self.logger.info( + "Executing intent: %s on %s", intent.get("type"), intent.get("target") + ) + action_type = intent.get("type") target = intent.get("target") value = intent.get("value") - + try: if action_type == "module_call": if target == "recon_scan": # Placeholder for recon trigger logic - return {"status": "success", "message": f"Triggered recon scan on {value}"} + return { + "status": "success", + "message": f"Triggered recon scan on {value}", + } elif target == "robin_search": - return {"status": "success", "message": f"Triggered Robin search for {value}"} - + return { + "status": "success", + "message": f"Triggered Robin search for {value}", + } + elif action_type == "tool_call": if target == "install_tool": - return {"status": "info", "message": f"Suggested installation of tool: {value}"} - + return { + "status": "info", + "message": f"Suggested installation of tool: {value}", + } + elif action_type == "ui_click": return {"status": "ui_intent", "action": "navigate", "tab": target} - + elif action_type == "ui_input": - return {"status": "ui_intent", "action": "fill", "element": target, "value": value} - - return {"status": "error", "message": f"Unknown intent type or target: {action_type}/{target}"} - + return { + "status": "ui_intent", + "action": "fill", + "element": target, + "value": value, + } + + return { + "status": "error", + "message": f"Unknown intent type or target: {action_type}/{target}", + } + except Exception as e: self.logger.error(f"Intent execution failed: {e}") return {"status": "error", "message": str(e)} async def _get_ai_response(self, prompt: str) -> str: """Get response from AI model with timeout. - + Args: prompt: The prompt to send to the AI - + Returns: The AI's response - + Raises: TimeoutError: If request times out """ @@ -190,7 +215,7 @@ async def _get_ai_response(self, prompt: str) -> str: # TODO: Implement actual AI model call # For now, return a placeholder response return "This is a placeholder response. AI integration pending." - + except asyncio.TimeoutError: self.logger.error("AI request timed out") raise TimeoutError("AI request timed out after 300 seconds") @@ -200,12 +225,12 @@ async def _get_ai_response(self, prompt: str) -> str: def _generate_report(self) -> Dict[str, str]: """Generate reports for the current session. - + Returns: Dictionary mapping report types to their file paths """ self.logger.info("Generating reports in %s format...", self.output_format) - + try: # Prepare context data context = { @@ -213,7 +238,7 @@ def _generate_report(self) -> Dict[str, str]: "domain": self.session_data.get("target_domain"), "ip": self.session_data.get("target_ip"), "scan_time": self.session_data.get("start_time"), - "duration": self._calculate_duration() + "duration": self._calculate_duration(), }, "modules": self.session_data["modules"], "tools": self.session_data["tools"], @@ -221,45 +246,50 @@ def _generate_report(self) -> Dict[str, str]: "exploits": self.session_data.get("execution_history", []), "defensive_measures": self.session_data.get("defensive_measures", []), "recommendations": self.session_data.get("recommendations", []), - "errors": self.session_data["errors"] + "errors": self.session_data["errors"], } - + # Generate reports - report_paths = self.report_generator.generate_reports(context, self.output_format) - + report_paths = self.report_generator.generate_reports( + context, self.output_format + ) + self.logger.info("Reports generated successfully") return report_paths - + except Exception as e: self.logger.error("Error generating reports: %s", str(e)) raise def _calculate_duration(self) -> str: """Calculate session duration. - + Returns: Formatted duration string """ start_time = datetime.fromisoformat(self.session_data["start_time"]) end_time = datetime.now() duration = end_time - start_time - + hours = duration.seconds // 3600 minutes = (duration.seconds % 3600) // 60 seconds = duration.seconds % 60 - + return f"{hours:02d}:{minutes:02d}:{seconds:02d}" def setup_ai(self) -> bool: """Setup AI environment: check Ollama service and model availability.""" try: - if hasattr(self, 'ollama'): + if hasattr(self, "ollama"): client = self.ollama else: from ai_integration import OllamaClient + client = OllamaClient() if not client.is_available(): - self.logger.error("Ollama service not available. Start with: ollama serve") + self.logger.error( + "Ollama service not available. Start with: ollama serve" + ) return False models = client.list_models() if not models: @@ -267,8 +297,10 @@ def setup_ai(self) -> bool: if not client.pull_model(client.main_model): self.logger.error("Failed to pull default model") return False - self.logger.info("AI setup complete. Available models: %s", [m['name'] for m in models]) + self.logger.info( + "AI setup complete. Available models: %s", [m["name"] for m in models] + ) return True except Exception as e: self.logger.error("Error in setup_ai: %s", e) - return False \ No newline at end of file + return False diff --git a/ai_integration.py b/ai_integration.py index c5fc2d4..ff5fe8d 100755 --- a/ai_integration.py +++ b/ai_integration.py @@ -16,21 +16,22 @@ import asyncio import ctypes + class OllamaClient: def __init__(self, base_url: str = "http://localhost:11434"): self.base_url = base_url self.logger = logging.getLogger(__name__) - + # Load configuration from environment - self.main_model = os.getenv("OLLAMA_MAIN_MODEL", "deepseek-coder-v2:16b-lite-base-q4_0") - self.assistant_model = os.getenv("OLLAMA_ASSISTANT_MODEL", "mistral:7b-instruct-v0.2-q4_0") - - self.backup_models = [ - "deepseek-coder:6.7b", - "codellama:7b", - "mistral:7b" - ] - + self.main_model = os.getenv( + "OLLAMA_MAIN_MODEL", "deepseek-coder-v2:16b-lite-base-q4_0" + ) + self.assistant_model = os.getenv( + "OLLAMA_ASSISTANT_MODEL", "mistral:7b-instruct-v0.2-q4_0" + ) + + self.backup_models = ["deepseek-coder:6.7b", "codellama:7b", "mistral:7b"] + def is_available(self) -> bool: """Check if Ollama service is running""" try: @@ -38,100 +39,118 @@ def is_available(self) -> bool: return response.status_code == 200 except (requests.RequestException, requests.Timeout, requests.ConnectionError): return False - + def list_models(self) -> List[Dict]: """List available models""" try: response = requests.get(f"{self.base_url}/api/tags") if response.status_code == 200: - return response.json().get('models', []) - except (requests.RequestException, requests.Timeout, requests.ConnectionError) as e: + return response.json().get("models", []) + except ( + requests.RequestException, + requests.Timeout, + requests.ConnectionError, + ) as e: self.logger.error("Error listing models: %s", e) return [] - + def pull_model(self, model: str) -> bool: """Pull a model if not available""" try: self.logger.info("Pulling model: %s", model) data = {"name": model} - response = requests.post(f"{self.base_url}/api/pull", json=data, stream=True) - + response = requests.post( + f"{self.base_url}/api/pull", json=data, stream=True + ) + for line in response.iter_lines(): if line: try: - status = json.loads(line.decode('utf-8')) - if status.get('status') == 'success': + status = json.loads(line.decode("utf-8")) + if status.get("status") == "success": return True except: continue except Exception as e: self.logger.error("Error pulling model %s: %s", model, e) return False - - def generate(self, prompt: str, model: str = None, system_prompt: str = None, format: str = None) -> Optional[str]: + + def generate( + self, + prompt: str, + model: str = None, + system_prompt: str = None, + format: str = None, + ) -> Optional[str]: """Generate text using Ollama""" if not model: model = self.get_best_model() - + if not model: self.logger.error("No suitable model available") return None - + try: data = { "model": model, "prompt": prompt, "stream": False, "options": { - "temperature": 0.5, # Lowered for more deterministic planning + "temperature": 0.5, # Lowered for more deterministic planning "top_p": 0.9, "max_tokens": 4096, - "num_ctx": 16384, # Increased context window for large prompts + "num_ctx": 16384, # Increased context window for large prompts "num_thread": 8, - "repeat_penalty": 1.1 - } + "repeat_penalty": 1.1, + }, } - + if system_prompt: data["system"] = system_prompt - + if format: data["format"] = format - - response = requests.post(f"{self.base_url}/api/generate", json=data, timeout=300) # Increased timeout - + + response = requests.post( + f"{self.base_url}/api/generate", json=data, timeout=300 + ) # Increased timeout + if response.status_code == 200: result = response.json() - return result.get('response', '').strip() + return result.get("response", "").strip() else: self.logger.error("Ollama API error: %s", response.status_code) - - except (requests.RequestException, requests.Timeout, requests.ConnectionError) as e: + + except ( + requests.RequestException, + requests.Timeout, + requests.ConnectionError, + ) as e: self.logger.error("Error generating with Ollama: %s", e) - + return None - + def get_best_model(self) -> Optional[str]: """Get the best available model""" - available_models = [m['name'] for m in self.list_models()] - + available_models = [m["name"] for m in self.list_models()] + # Check main model first (deepseek-coder-v2:16b-lite-base-q4_0) if self.main_model in available_models: return self.main_model - + # Check assistant model next (mistral:7b-instruct-v0.2-q4_0) if self.assistant_model in available_models: return self.assistant_model - + # Check backup models for model in self.backup_models: if model in available_models: return model - + # If no preferred models, return first available if available_models: return available_models[0] - + return None @@ -151,20 +170,23 @@ def get_best_model(self) -> Optional[str]: c_parser = None C_PARSER_AVAILABLE = False import logging - logging.getLogger(__name__).warning("C parser library not found. Using Python fallback.") + + logging.getLogger(__name__).warning( + "C parser library not found. Using Python fallback." + ) class AIAnalyzer: def __init__(self, ollama_client: OllamaClient): self.ollama = ollama_client self.logger = logging.getLogger(__name__) - + def analyze_nmap_output(self, nmap_output: str) -> Dict[str, Any]: """Analyze nmap scan results using AI""" system_prompt = """You are a cybersecurity expert analyzing nmap scan results. Identify potential vulnerabilities, interesting services, and security issues. Provide structured analysis in JSON format with severity levels.""" - + prompt = f""" Analyze this nmap scan output and identify potential security issues: @@ -187,28 +209,28 @@ def analyze_nmap_output(self, nmap_output: str) -> Dict[str, Any]: "next_steps": ["recommended follow-up actions"] }} """ - + response = self.ollama.generate(prompt, system_prompt=system_prompt) if response: try: return json.loads(response) except json.JSONDecodeError: # Extract JSON if wrapped in markdown - json_match = re.search(r'```json\n(.*?)\n```', response, re.DOTALL) + json_match = re.search(r"```json\n(.*?)\n```", response, re.DOTALL) if json_match: try: return json.loads(json_match.group(1)) except json.JSONDecodeError: pass - + return {"error": "Failed to analyze nmap output", "raw_response": response} - + def generate_exploit_code(self, vulnerability_info: Dict) -> str: """Generate exploit code based on vulnerability information""" system_prompt = """You are a security researcher creating proof-of-concept exploit code. Generate safe, educational exploit code with proper error handling and comments. Include safety warnings and ethical use disclaimers.""" - + prompt = f""" Generate a Python proof-of-concept exploit for this vulnerability: @@ -227,18 +249,18 @@ def generate_exploit_code(self, vulnerability_info: Dict) -> str: Generate complete, working Python code: """ - + response = self.ollama.generate(prompt, system_prompt=system_prompt) return response or "# Failed to generate exploit code" - + def analyze_web_response(self, url: str, response_data: Dict) -> Dict[str, Any]: """Analyze web service response for vulnerabilities""" system_prompt = """You are a web application security expert. Analyze HTTP responses for potential vulnerabilities and security issues.""" - - headers = response_data.get('headers', {}) - content = response_data.get('content', '')[:2000] # Limit content length - + + headers = response_data.get("headers", {}) + content = response_data.get("content", "")[:2000] # Limit content length + prompt = f""" Analyze this web service for security issues: @@ -273,26 +295,28 @@ def analyze_web_response(self, url: str, response_data: Dict) -> Dict[str, Any]: "recommendations": ["security recommendations"] }} """ - + response = self.ollama.generate(prompt, system_prompt=system_prompt) if response: try: return json.loads(response) except json.JSONDecodeError: - json_match = re.search(r'```json\n(.*?)\n```', response, re.DOTALL) + json_match = re.search(r"```json\n(.*?)\n```", response, re.DOTALL) if json_match: try: return json.loads(json_match.group(1)) except json.JSONDecodeError: pass - + return {"error": "Failed to analyze web response", "raw_response": response} - - def fix_broken_tool(self, tool_name: str, error_output: str, source_code: str = None) -> str: + + def fix_broken_tool( + self, tool_name: str, error_output: str, source_code: str = None + ) -> str: """Generate fixes for broken security tools""" system_prompt = """You are a DevOps engineer specializing in fixing broken security tools. Analyze errors and provide working solutions.""" - + prompt = f""" This security tool is broken and needs fixing: @@ -314,15 +338,15 @@ def fix_broken_tool(self, tool_name: str, error_output: str, source_code: str = - Missing environment variables - Network/permission issues """ - + response = self.ollama.generate(prompt, system_prompt=system_prompt) return response or "# Failed to generate fix" - + def prioritize_vulnerabilities(self, vulnerabilities: List[Dict]) -> List[Dict]: """Use AI to prioritize vulnerabilities by exploitability and impact""" system_prompt = """You are a penetration tester prioritizing vulnerabilities. Rank vulnerabilities by exploitability and business impact.""" - + prompt = f""" Prioritize these vulnerabilities for testing: @@ -349,35 +373,42 @@ def prioritize_vulnerabilities(self, vulnerabilities: List[Dict]) -> List[Dict]: ] }} """ - + response = self.ollama.generate(prompt, system_prompt=system_prompt) if response: try: result = json.loads(response) - return result.get('prioritized_vulnerabilities', vulnerabilities) + return result.get("prioritized_vulnerabilities", vulnerabilities) except: pass - + return vulnerabilities def analyze_nuclei_output(self, nuclei_json_output: str) -> Dict[str, Any]: """Analyze nuclei output using the high-speed C parser.""" - + if C_PARSER_AVAILABLE and c_parser: # Use the high-speed C parser when available - raw_summary = c_parser.parse_nuclei_output(nuclei_json_output.encode('utf-8')) - summary_str = raw_summary.decode('utf-8') + raw_summary = c_parser.parse_nuclei_output( + nuclei_json_output.encode("utf-8") + ) + summary_str = raw_summary.decode("utf-8") else: # SECURITY FIX: Fallback to pure Python implementation # This ensures the application works even without the native library try: import json + data = json.loads(nuclei_json_output) - critical_count = sum(1 for item in data if item.get('info', {}).get('severity') == 'critical') + critical_count = sum( + 1 + for item in data + if item.get("info", {}).get("severity") == "critical" + ) summary_str = json.dumps({"critical_findings": critical_count}) except (json.JSONDecodeError, TypeError): summary_str = '{"error": "Failed to parse nuclei output"}' - + try: return json.loads(summary_str) except json.JSONDecodeError: @@ -389,6 +420,7 @@ class AIOrchestrator: Manages a multi-step AI reasoning pipeline for complex security tasks. It chains specialized prompts for planning, tool selection, and execution. """ + def __init__(self, prompt_dir: Path): self.prompt_dir = prompt_dir self.ollama = OllamaClient() @@ -414,7 +446,9 @@ def _load_prompts(self): with open(tool_path, "r") as f: prompts["tool_selector"] = f.read() else: - prompts["tool_selector"] = "You are an expert at selecting the best security tool for a task." + prompts["tool_selector"] = ( + "You are an expert at selecting the best security tool for a task." + ) # Cursor-style code/analysis prompt analyst_path = self.prompt_dir / "Cursor Prompts" / "Cursor Prompts.txt" @@ -422,20 +456,25 @@ def _load_prompts(self): if not analyst_path.exists(): # Fallback to the first .txt file found if possible, or a default analyst_path = self.prompt_dir / "Cursor Prompts" / "System Prompt.txt" - + if analyst_path.exists(): with open(analyst_path, "r") as f: prompts["analyst"] = f.read() else: - prompts["analyst"] = "You are a senior security researcher analyzing results." - + prompts["analyst"] = ( + "You are a senior security researcher analyzing results." + ) + except Exception as e: logging.getLogger(__name__).error(f"Error loading specialized prompts: {e}") # Ensure we have defaults if everything fails prompts.setdefault("planner", "You are an expert security planner.") - prompts.setdefault("tool_selector", "You are an expert at selecting the best security tool.") + prompts.setdefault( + "tool_selector", + "You are an expert at selecting the best security tool.", + ) prompts.setdefault("analyst", "You are a senior security researcher.") - + return prompts def execute_task(self, task_description: str): @@ -443,25 +482,25 @@ def execute_task(self, task_description: str): Executes a full task pipeline: Plan -> Select Tool -> Execute -> Analyze. """ print("--- AI Task Pipeline Initiated ---") - + # 1. Planning Phase (using Devin's prompt) plan = self._planning_phase(task_description) - self.state['plan'] = plan + self.state["plan"] = plan print(f"Phase 1: Plan Created -> {plan}") # 2. Tool Selection Phase (using Manus' prompt) tool_command = self._tool_selection_phase(task_description, plan) - self.state['tool_command'] = tool_command + self.state["tool_command"] = tool_command print(f"Phase 2: Tool Selected -> {tool_command}") # 3. Execution Phase (simulated) execution_result = self._execution_phase(tool_command) - self.state['execution_result'] = execution_result + self.state["execution_result"] = execution_result print(f"Phase 3: Execution Result -> {execution_result[:100]}...") # 4. Analysis Phase (using Cursor's prompt) analysis = self._analysis_phase(execution_result) - self.state['analysis'] = analysis + self.state["analysis"] = analysis print(f"Phase 4: Analysis Complete -> {analysis}") print("--- AI Task Pipeline Complete ---") @@ -469,18 +508,18 @@ def execute_task(self, task_description: str): def _planning_phase(self, task: str) -> str: """Uses the 'planner' prompt to create a high-level strategy.""" - system_prompt = self.prompts['planner'] + system_prompt = self.prompts["planner"] user_prompt = f"Create a step-by-step plan for the following task: {task}" response = self.ollama.generate(user_prompt, system_prompt=system_prompt) return response def _tool_selection_phase(self, task: str, plan: str) -> str: """Uses the 'tool_selector' prompt to choose the right command.""" - system_prompt = self.prompts['tool_selector'] + system_prompt = self.prompts["tool_selector"] user_prompt = f"Given the task '{task}' and the plan '{plan}', what is the exact shell command to execute next? Only output the command." response = self.ollama.generate(user_prompt, system_prompt=system_prompt) return response - + def _execution_phase(self, command: str) -> str: """Simulates running the command and returns mock output.""" print(f"Simulating execution of: `{command}`") @@ -490,7 +529,7 @@ def _execution_phase(self, command: str) -> str: def _analysis_phase(self, result: str) -> str: """Uses the 'analyst' prompt to interpret the results.""" - system_prompt = self.prompts['analyst'] + system_prompt = self.prompts["analyst"] user_prompt = f"Analyze the following tool output and provide a summary of key findings and recommendations:\n\n{result}" response = self.ollama.generate(user_prompt, system_prompt=system_prompt) return response @@ -499,43 +538,51 @@ def _analysis_phase(self, result: str) -> str: # CLI interface for AI module if __name__ == "__main__": import argparse - + parser = argparse.ArgumentParser(description="VulnForge AI Module") - parser.add_argument("--test-connection", action="store_true", help="Test Ollama connection") + parser.add_argument( + "--test-connection", action="store_true", help="Test Ollama connection" + ) parser.add_argument("--pull-model", help="Pull a specific model") - parser.add_argument("--list-models", action="store_true", help="List available models") + parser.add_argument( + "--list-models", action="store_true", help="List available models" + ) parser.add_argument("--analyze-nmap", help="Analyze nmap output file") parser.add_argument( - "--ai-pipeline", action="store_true", help="Enable the advanced multi-prompt AI pipeline." + "--ai-pipeline", + action="store_true", + help="Enable the advanced multi-prompt AI pipeline.", ) parser.add_argument( - "--prompt-dir", help="Directory for the AI pipeline prompts.", default="prompts/system_prompts" + "--prompt-dir", + help="Directory for the AI pipeline prompts.", + default="prompts/system_prompts", ) - + args = parser.parse_args() - + orchestrator = AIOrchestrator(Path(args.prompt_dir)) - + if args.test_connection: if orchestrator.ollama.is_available(): print("✓ AI system ready") else: print("✗ AI system not available") - + elif args.pull_model: if orchestrator.ollama.pull_model(args.pull_model): print(f"✓ Model {args.pull_model} pulled successfully") else: print(f"✗ Failed to pull model {args.pull_model}") - + elif args.list_models: models = orchestrator.ollama.list_models() print("Available models:") for model in models: print(f" - {model['name']}") - + elif args.analyze_nmap: - with open(args.analyze_nmap, 'r') as f: + with open(args.analyze_nmap, "r") as f: content = f.read() result = orchestrator.analyzer.analyze_nmap_output(content) print(json.dumps(result, indent=2)) @@ -543,11 +590,13 @@ def _analysis_phase(self, result: str) -> str: # Handle AI Pipeline Mode if args.ai_pipeline: if not args.target: - print("Error: A target is required for AI pipeline mode, e.g., --target 'scan example.com'") - + print( + "Error: A target is required for AI pipeline mode, e.g., --target 'scan example.com'" + ) + prompt_path = Path(args.prompt_dir) if not prompt_path.exists(): print(f"Error: Prompt directory not found at '{prompt_path}'") - + orchestrator = AIOrchestrator(prompt_path) - orchestrator.execute_task(f"Perform a security scan on {args.target}") \ No newline at end of file + orchestrator.execute_task(f"Perform a security scan on {args.target}") diff --git a/modules/ai/agent.py b/modules/ai/agent.py index e71c9b1..c86f605 100644 --- a/modules/ai/agent.py +++ b/modules/ai/agent.py @@ -5,19 +5,22 @@ from typing import Dict, Any, List, Optional from ai_integration import OllamaClient + class VulnForgeAgent: """Simple Agentic AI for VulnForge. - + This agent understands the framework modules and outputs structured action intents in JSON format. """ - + def __init__(self, ollama_client: Optional[OllamaClient] = None): self.ollama = ollama_client or OllamaClient() self.logger = logging.getLogger("vulnforge.agent") - self.prompt_path = Path(__file__).resolve().parents[2] / "prompts" / "agentic_system.md" + self.prompt_path = ( + Path(__file__).resolve().parents[2] / "prompts" / "agentic_system.md" + ) self._system_prompt = self._load_system_prompt() - + def _load_system_prompt(self) -> str: """Load the agentic system prompt from file.""" try: @@ -30,13 +33,15 @@ def _load_system_prompt(self) -> str: self.logger.error(f"Error loading system prompt: {e}") return "You are a security assistant for the VulnForge framework." - async def run_task(self, task: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + async def run_task( + self, task: str, context: Optional[Dict[str, Any]] = None + ) -> Dict[str, Any]: """Process a user task and return a structured response. - + Args: task: The user's request or task description. context: Additional context like current module, available tools, etc. - + Returns: A dictionary containing the AI's plan or response. """ @@ -44,51 +49,87 @@ async def run_task(self, task: str, context: Optional[Dict[str, Any]] = None) -> prompt = f"User Task: {task}\n" if context: prompt += f"Context: {json.dumps(context, indent=2)}\n" - + prompt += "\nRespond with a JSON object. Follow the schema exactly.\n" - + self.logger.info(f"Agent processing task: {task[:50]}...") - + # Define the schema for Ollama to enforce schema = { "type": "object", "properties": { "thought": {"type": "string"}, - "mode": {"type": "string", "enum": ["ACTION_PLAN", "ACTION_EXECUTION", "RESPONSE", "CLARIFICATION"]}, + "mode": { + "type": "string", + "enum": [ + "ACTION_PLAN", + "ACTION_EXECUTION", + "RESPONSE", + "CLARIFICATION", + ], + }, "goal": {"type": "string"}, "steps": { "type": "array", "items": { "type": "object", "properties": { - "type": {"type": "string", "enum": ["ui_click", "ui_input", "module_call", "tool_call"]}, - "target": {"type": "string", "enum": [ - "recon_scan", "robin_search", "ai_assistant", - "nmap", "subfinder", "httpx", "nuclei", "gobuster", "ffuf", "whatweb", - "Overview", "Recon", "Robin", "Tool Manager", "Assistant", "Reports", "Settings", - "domain_input", "query_input" - ]}, + "type": { + "type": "string", + "enum": [ + "ui_click", + "ui_input", + "module_call", + "tool_call", + ], + }, + "target": { + "type": "string", + "enum": [ + "recon_scan", + "robin_search", + "ai_assistant", + "nmap", + "subfinder", + "httpx", + "nuclei", + "gobuster", + "ffuf", + "whatweb", + "Overview", + "Recon", + "Robin", + "Tool Manager", + "Assistant", + "Reports", + "Settings", + "domain_input", + "query_input", + ], + }, "value": {"type": "string"}, - "reason": {"type": "string"} + "reason": {"type": "string"}, }, - "required": ["type", "target", "value", "reason"] - } + "required": ["type", "target", "value", "reason"], + }, }, - "content": {"type": "string"} + "content": {"type": "string"}, }, - "required": ["thought", "mode", "goal", "steps"] + "required": ["thought", "mode", "goal", "steps"], } - + # Pass schema to Ollama - raw_response = self.ollama.generate(prompt, system_prompt=self._system_prompt, format=schema) - + raw_response = self.ollama.generate( + prompt, system_prompt=self._system_prompt, format=schema + ) + if not raw_response: return { "mode": "RESPONSE", "content": "I apologize, but I failed to generate a response. Please check the Ollama service.", - "status": "error" + "status": "error", } - + return self._parse_response(raw_response) def _parse_response(self, raw_response: str) -> Dict[str, Any]: @@ -106,38 +147,40 @@ def _parse_response(self, raw_response: str) -> Dict[str, Any]: first_brace = raw_response.find("{") last_brace = raw_response.rfind("}") if first_brace != -1 and last_brace != -1: - json_candidate = raw_response[first_brace:last_brace+1] + json_candidate = raw_response[first_brace : last_brace + 1] try: return json.loads(json_candidate) except json.JSONDecodeError: pass # 3. If no JSON found, treat as plain response - self.logger.debug("No valid JSON found in AI response, falling back to RESPONSE mode.") + self.logger.debug( + "No valid JSON found in AI response, falling back to RESPONSE mode." + ) return { "mode": "RESPONSE", "content": raw_response, "status": "partial_success", - "parsing_error": "No valid JSON structure identified" + "parsing_error": "No valid JSON structure identified", } - + except Exception as e: self.logger.error(f"Unexpected error during parsing: {e}") return { "mode": "RESPONSE", "content": raw_response, "status": "error", - "parsing_error": str(e) + "parsing_error": str(e), } def get_readiness_status(self) -> Dict[str, Any]: """Check if the agent is ready for operation.""" is_ollama_available = self.ollama.is_available() best_model = self.ollama.get_best_model() if is_ollama_available else None - + return { "ready": is_ollama_available and best_model is not None, "ollama_available": is_ollama_available, "model_ready": best_model is not None, - "active_model": best_model + "active_model": best_model, } diff --git a/modules/web/dashboard.py b/modules/web/dashboard.py index 03e263f..562e2ef 100644 --- a/modules/web/dashboard.py +++ b/modules/web/dashboard.py @@ -34,12 +34,14 @@ initial_sidebar_state="expanded", ) + # --- Custom Styling --- def local_css(file_name): if Path(file_name).exists(): with open(file_name) as f: st.markdown(f"", unsafe_allow_html=True) + local_css(str(ASSETS_DIR / "style.css")) # --- Session State Initialization --- @@ -48,7 +50,7 @@ def local_css(file_name): if "recon" not in st.session_state: st.session_state.recon = EnhancedReconModule( base_dir=st.session_state.vulnforge.base_dir, - ai_analyzer=st.session_state.vulnforge.ai_analyzer + ai_analyzer=st.session_state.vulnforge.ai_analyzer, ) if "nav" not in st.session_state: st.session_state.nav = "Overview" @@ -60,9 +62,9 @@ def local_css(file_name): st.image(str(logo_path), width="stretch") else: st.title("🛡️ VulnForge") - + st.markdown("---") - + nav_options = { "🏠 Overview": "Overview", "🔍 Reconnaissance": "Recon", @@ -70,11 +72,15 @@ def local_css(file_name): "🛠️ Tool Manager": "Tools", "🤖 AI Assistant": "AI", "📑 Reports": "Reports", - "⚙️ Settings": "Settings" + "⚙️ Settings": "Settings", } - + for label, key in nav_options.items(): - if st.button(label, width="stretch", type="primary" if st.session_state.nav == key else "secondary"): + if st.button( + label, + width="stretch", + type="primary" if st.session_state.nav == key else "secondary", + ): st.session_state.nav = key st.rerun() @@ -83,13 +89,14 @@ def local_css(file_name): # --- Dashboard Logic --- + def show_overview(): st.title("🏠 Framework Overview") - + # Fetch real counts results_dir = st.session_state.vulnforge.results_dir recon_scans = len(list(results_dir.glob("*"))) if results_dir.exists() else 0 - + # Just an estimate for demonstration total_vulns = 0 for report in results_dir.glob("**/results.json"): @@ -97,7 +104,8 @@ def show_overview(): with open(report) as f: data = json.load(f) total_vulns += len(data.get("vulnerabilities", [])) - except: pass + except: + pass # Summary Metrics col1, col2, col3, col4, col5 = st.columns(5) @@ -105,31 +113,43 @@ def show_overview(): st.metric("Total Scans", str(recon_scans)) with col2: st.metric("Vulnerabilities", str(total_vulns)) - + # Dynamic System Health agent_status = st.session_state.vulnforge.agent.get_readiness_status() health_status = "Optimal" if agent_status["ready"] else "Degraded" - + with col3: st.metric("System Health", health_status) with col4: # Check if Ollama is running from modules.darkweb.robin.llm_utils import fetch_ollama_models + ollama_status = "Connected" if fetch_ollama_models() else "Disconnected" st.metric("Ollama AI", ollama_status) with col5: - is_agentic = os.getenv("VULNFORGE_AGENTIC") == "1" or st.session_state.vulnforge.agentic_mode + is_agentic = ( + os.getenv("VULNFORGE_AGENTIC") == "1" + or st.session_state.vulnforge.agentic_mode + ) agent_ui_status = "Enabled" if is_agentic else "Disabled" st.metric("Agentic AI", agent_ui_status) st.markdown("### 🕒 Recent Activity") # Fetch last 5 modified items in results - recent_items = sorted(list(results_dir.glob("*")), key=lambda p: p.stat().st_mtime, reverse=True)[:5] + recent_items = sorted( + list(results_dir.glob("*")), key=lambda p: p.stat().st_mtime, reverse=True + )[:5] activity_data = [] for item in recent_items: mtime = datetime.fromtimestamp(item.stat().st_mtime).strftime("%Y-%m-%d %H:%M") - activity_data.append({"Time": mtime, "Activity": f"Scan results found for {item.name}", "Status": "Success"}) - + activity_data.append( + { + "Time": mtime, + "Activity": f"Scan results found for {item.name}", + "Status": "Success", + } + ) + if activity_data: st.table(activity_data) else: @@ -138,22 +158,22 @@ def show_overview(): def show_recon(): st.title("🔍 Reconnaissance Module") - + target = st.text_input("Enter Target Domain", placeholder="example.com") - + col1, col2 = st.columns(2) with col1: scan_type = st.selectbox("Scan Intensity", ["Passive", "Normal", "Aggressive"]) with col2: use_ai = st.checkbox("Enable AI Analysis", value=True) - + if st.button("🚀 Start Recon Scan"): if not target: st.error("Please enter a target domain.") else: status_slot = st.empty() progress_bar = st.progress(0) - + async def run_scan(): status_slot.info(f"🔍 Starting reconnaissance on {target}...") # We need to bridge async with streamlit @@ -165,115 +185,141 @@ async def run_scan(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) results = loop.run_until_complete(run_scan()) - + if "error" in results: st.error(f"Scan failed: {results['error']}") else: st.success(f"Scan completed for {target}!") - + st.markdown("### 📊 Scan Results") c1, c2, c3 = st.columns(3) c1.metric("Subdomains", len(results["subdomains"])) c2.metric("Services", len(results["web_services"])) c3.metric("Vulns", len(results["vulnerabilities"])) - + with st.expander("🌐 Subdomains Found", expanded=True): st.write(", ".join(results["subdomains"])) - + if results["web_services"]: with st.expander("🚀 Web Services", expanded=False): st.table(results["web_services"]) - + if results["vulnerabilities"]: with st.expander("⚠️ Vulnerabilities", expanded=True): st.table(results["vulnerabilities"]) if results["ai_analysis"]: st.markdown("### 🤖 AI Analysis Report") - st.write(results["ai_analysis"].get("raw_analysis", "No analysis available.")) + st.write( + results["ai_analysis"].get( + "raw_analysis", "No analysis available." + ) + ) def show_robin(): # Integrate existing Robin UI logic here or import it st.title("🕷️ Robin - Dark Web OSINT") - + # We can import the UI components from Robin try: from modules.darkweb.robin.ui import ( - get_model_choices, get_llm, refine_query, - cached_search_results, filter_results, - cached_scrape_multiple, generate_summary, - BufferedStreamingHandler, OLLAMA_MAIN_MODEL + get_model_choices, + get_llm, + refine_query, + cached_search_results, + filter_results, + cached_scrape_multiple, + generate_summary, + BufferedStreamingHandler, + OLLAMA_MAIN_MODEL, ) - + model_options = get_model_choices() default_model = OLLAMA_MAIN_MODEL or "gpt-5-mini" - default_model_index = next((i for i, n in enumerate(model_options) if n.lower() == default_model.lower()), 0) - + default_model_index = next( + ( + i + for i, n in enumerate(model_options) + if n.lower() == default_model.lower() + ), + 0, + ) + c1, c2 = st.columns([3, 1]) with c1: - query = st.text_input("Dark Web Query", placeholder="e.g. 'ransomware' or 'leaked databases'") + query = st.text_input( + "Dark Web Query", placeholder="e.g. 'ransomware' or 'leaked databases'" + ) with c2: model = st.selectbox("LLM Model", model_options, index=default_model_index) - + threads = st.slider("Scraping Threads", 1, 16, 4) - + if st.button("🔍 Run Robin Investigation"): if query: progress_bar = st.progress(0) status_slot = st.empty() - + status_slot.info("🔄 Initializing LLM...") llm = get_llm(model) progress_bar.progress(20) - + status_slot.info("🔄 Refining query...") refined = refine_query(llm, query) progress_bar.progress(40) - + status_slot.info("🔍 Searching dark web...") results = cached_search_results(refined, threads) progress_bar.progress(60) - + status_slot.info("🗂️ Filtering & Scraping...") filtered = filter_results(llm, refined, results) scraped = cached_scrape_multiple(filtered, threads) progress_bar.progress(80) - + status_slot.info("✍️ Generating final report...") summary_slot = st.empty() + def ui_emit(chunk): summary_slot.markdown(chunk) - + stream_handler = BufferedStreamingHandler(ui_callback=ui_emit) llm.callbacks = [stream_handler] summary = generate_summary(llm, query, scraped) - + progress_bar.progress(100) status_slot.success("✔️ Investigation Completed!") st.markdown(summary) else: st.warning("Please enter a query.") - + except Exception as e: st.error(f"Critical error in Robin module: {e}") + def show_tools(): st.title("🛠️ System Tool Manager") - + required_tools = [ - "nmap", "subfinder", "httpx", "gobuster", - "nuclei", "ffuf", "whatweb", "dig" + "nmap", + "subfinder", + "httpx", + "gobuster", + "nuclei", + "ffuf", + "whatweb", + "dig", ] - + st.markdown("Check the installation status of required system tools.") - + for tool in required_tools: col1, col2, col3 = st.columns([3, 2, 1]) col1.write(f"**{tool.capitalize()}**") - + is_installed = st.session_state.vulnforge.is_tool_installed(tool) - + if is_installed: col2.success("✅ Installed") if col3.button("Verify", key=tool): @@ -282,14 +328,16 @@ def show_tools(): col2.error("❌ Missing") if col3.button("Install", key=tool): with st.spinner(f"Installing {tool}..."): - # This is a bit dangerous to run directly from UI without sudo handled, + # This is a bit dangerous to run directly from UI without sudo handled, # but we can try or show instructions - st.warning(f"Please run 'sudo apt install {tool}' or check VulnForge CLI to install.") + st.warning( + f"Please run 'sudo apt install {tool}' or check VulnForge CLI to install." + ) def show_ai(): st.title("🤖 AI Security Assistant") - + if "messages" not in st.session_state: st.session_state.messages = [] @@ -304,37 +352,52 @@ def show_ai(): with st.chat_message("assistant"): with st.spinner("Thinking..."): - response = st.session_state.vulnforge.ai_analyzer.ollama.generate(prompt) + response = st.session_state.vulnforge.ai_analyzer.ollama.generate( + prompt + ) st.markdown(response) - st.session_state.messages.append({"role": "assistant", "content": response}) + st.session_state.messages.append( + {"role": "assistant", "content": response} + ) + def show_reports(): st.title("📑 Intelligence Reports") - + results_dir = st.session_state.vulnforge.results_dir reports = list(results_dir.glob("**/*.md")) + list(results_dir.glob("**/*.json")) - + if not reports: - st.info("No reports generated yet. Run a Recon or Robin scan to see reports here.") + st.info( + "No reports generated yet. Run a Recon or Robin scan to see reports here." + ) return # Filter and sort reports = sorted(reports, key=lambda p: p.stat().st_mtime, reverse=True) - + for report in reports: with st.container(border=True): c1, c2, c3 = st.columns([4, 2, 1]) c1.write(f"**{report.name}**") c1.caption(f"Path: {report.parent.name}") - c2.write(datetime.fromtimestamp(report.stat().st_mtime).strftime("%Y-%m-%d %H:%M")) - + c2.write( + datetime.fromtimestamp(report.stat().st_mtime).strftime( + "%Y-%m-%d %H:%M" + ) + ) + with open(report, "rb") as f: btn = c3.download_button( label="📥", data=f, file_name=report.name, - mime="text/markdown" if report.suffix == ".md" else "application/json", - key=str(report) + mime=( + "text/markdown" + if report.suffix == ".md" + else "application/json" + ), + key=str(report), ) @@ -342,14 +405,15 @@ def show_settings(): st.title("⚙️ Framework Settings") st.subheader("Ollama Configuration") st.text_input("Ollama Base URL", value="http://localhost:11434") - + st.subheader("API Integration") st.text_input("OpenAI API Key", type="password") st.text_input("Shodan API Key", type="password") - + if st.button("💾 Save Settings"): st.success("Settings saved successfully!") + # --- Page Routing --- if st.session_state.nav == "Overview": show_overview() diff --git a/vulnforge_main.py b/vulnforge_main.py index 4a34686..7c97a42 100755 --- a/vulnforge_main.py +++ b/vulnforge_main.py @@ -31,13 +31,13 @@ RateLimiter, FilePermissionManager, validate_target, - sanitize_filename + sanitize_filename, ) from utils.auth import get_auth_manager, Permission from recon_module import EnhancedReconModule from ai_integration import AIAnalyzer, OllamaClient -from ai_orchestrator import AIOrchestrator # New Import +from ai_orchestrator import AIOrchestrator # New Import import modules.darkweb as darkweb_module from modules.ai.agent import VulnForgeAgent @@ -144,8 +144,12 @@ def install_missing_tools(self): self.logger.info("Installing %s...", tool) try: # SECURITY FIX: Use full executable path and handle subprocess failures - result = subprocess.run(["/usr/bin/go", "install", package], - capture_output=True, text=True, check=True) + result = subprocess.run( + ["/usr/bin/go", "install", package], + capture_output=True, + text=True, + check=True, + ) self.logger.info("Successfully installed %s", tool) except subprocess.CalledProcessError as e: self.logger.error("Failed to install %s: %s", tool, e) @@ -159,7 +163,9 @@ def install_missing_tools(self): async def run_recon(self, target: str, output_dir: Optional[Path] = None): """Run reconnaissance on target""" - recon = EnhancedReconModule(self.base_dir, self.ai_analyzer, config_path="configs/tools.json") + recon = EnhancedReconModule( + self.base_dir, self.ai_analyzer, config_path="configs/tools.json" + ) return await recon.run_recon(target, output_dir) def ask_ai(self, question: str): @@ -178,9 +184,9 @@ def ask_ai(self, question: str): ) @RateLimiter(max_calls=5, time_window=60) - def generate_tool(self, description: str, identifier: str = 'default'): + def generate_tool(self, description: str, identifier: str = "default"): """Generate a custom tool using AI and save it to custom_tools directory. - + Args: description: Tool description identifier: Rate limit identifier (username/session) @@ -190,12 +196,12 @@ def generate_tool(self, description: str, identifier: str = 'default'): if not description or not isinstance(description, str): self.logger.error("Invalid tool description") return - + # SECURITY: Limit description length if len(description) > 500: self.logger.error("Tool description too long (max 500 chars)") return - + # Use os.path.expanduser to properly handle home directory tool_dir = Path.home() / ".vulnforge" / "custom_tools" self.console.print( @@ -229,10 +235,14 @@ def generate_tool(self, description: str, identifier: str = 'default'): base_name = re.sub(r"[^a-zA-Z0-9]+", "_", description.strip().lower())[ :32 ].strip("_") - filename = sanitize_filename(f"{base_name or 'custom_tool'}_{int(time.time())}.py") - + filename = sanitize_filename( + f"{base_name or 'custom_tool'}_{int(time.time())}.py" + ) + # SECURITY: Validate path to prevent traversal - tool_path = SecurityValidator.sanitize_path(str(tool_dir / filename), base_dir=tool_dir) + tool_path = SecurityValidator.sanitize_path( + str(tool_dir / filename), base_dir=tool_dir + ) if not tool_path: self.logger.error("Invalid tool path") return @@ -240,7 +250,7 @@ def generate_tool(self, description: str, identifier: str = 'default'): # Write the generated tool to file with open(tool_path, "w", encoding="utf-8") as f: f.write(response) - + # SECURITY: Set secure file permissions (0o600) for generated tool files FilePermissionManager.set_secure_permissions(tool_path, mode=0o600) self.console.print( @@ -286,13 +296,13 @@ def list_custom_tools(self): try: # SECURITY: Use Path and validate directory tool_dir = Path.home() / ".vulnforge" / "custom_tools" - + # SECURITY: Validate path tool_dir = SecurityValidator.sanitize_path(str(tool_dir)) if not tool_dir: self.logger.error("Invalid tool directory path") return - + metadata_path = tool_dir / "metadata.json" if not tool_dir.exists(): @@ -419,7 +429,7 @@ def get_parser(): For detailed documentation, visit: https://github.com/Arunking9/VulnForge """, - formatter_class=argparse.RawDescriptionHelpFormatter + formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("--target", "-t", help="Target domain or IP") parser.add_argument( @@ -455,74 +465,108 @@ def get_parser(): "--stealth", "-s", action="store_true", help="Enable stealth mode" ) parser.add_argument( - "--ai-pipeline", action="store_true", help="Enable the advanced multi-prompt AI pipeline." + "--ai-pipeline", + action="store_true", + help="Enable the advanced multi-prompt AI pipeline.", ) parser.add_argument( - "--prompt-dir", help="Directory for the AI pipeline prompts.", default="prompts/system_prompts" + "--prompt-dir", + help="Directory for the AI pipeline prompts.", + default="prompts/system_prompts", ) parser.add_argument( - "--uninstall", action="store_true", help="Uninstall VulnForge and its components" + "--uninstall", + action="store_true", + help="Uninstall VulnForge and its components", ) parser.add_argument( - "--webmod", action="store_true", help="Launch VulnForge web interface (Streamlit UI)" + "--webmod", + action="store_true", + help="Launch VulnForge web interface (Streamlit UI)", ) parser.add_argument( - "--web-host", default="localhost", help="Web interface host (default: localhost)" + "--web-host", + default="localhost", + help="Web interface host (default: localhost)", ) parser.add_argument( "--web-port", type=int, default=8501, help="Web interface port (default: 8501)" ) parser.add_argument( - "--agentic", "--ai-agent", action="store_true", help="Enable simple agentic AI mode" + "--agentic", + "--ai-agent", + action="store_true", + help="Enable simple agentic AI mode", ) # Add subparsers for commands - subparsers = parser.add_subparsers(dest='command', help='Available commands') - + subparsers = parser.add_subparsers(dest="command", help="Available commands") + # Ask AI command - ask_ai_parser = subparsers.add_parser('ask-ai', help='Ask the AI assistant a question') - ask_ai_parser.add_argument('question', help='The question to ask the AI') - ask_ai_parser.add_argument('--verbose', action='store_true', help='Show detailed model logs') - ask_ai_parser.add_argument('--dangerous', action='store_true', help='Enable dangerous mode') - ask_ai_parser.add_argument('--confirm-danger', action='store_true', help='Confirm dangerous mode') - + ask_ai_parser = subparsers.add_parser( + "ask-ai", help="Ask the AI assistant a question" + ) + ask_ai_parser.add_argument("question", help="The question to ask the AI") + ask_ai_parser.add_argument( + "--verbose", action="store_true", help="Show detailed model logs" + ) + ask_ai_parser.add_argument( + "--dangerous", action="store_true", help="Enable dangerous mode" + ) + ask_ai_parser.add_argument( + "--confirm-danger", action="store_true", help="Confirm dangerous mode" + ) + # Generate tool command - generate_tool_parser = subparsers.add_parser('generate-tool', help='Generate a custom tool') - generate_tool_parser.add_argument('description', help='Description of the tool to generate') - generate_tool_parser.add_argument('--verbose', action='store_true', help='Show detailed generation logs') - + generate_tool_parser = subparsers.add_parser( + "generate-tool", help="Generate a custom tool" + ) + generate_tool_parser.add_argument( + "description", help="Description of the tool to generate" + ) + generate_tool_parser.add_argument( + "--verbose", action="store_true", help="Show detailed generation logs" + ) + # List tools command - list_tools_parser = subparsers.add_parser('list-tools', help='List all custom tools') - list_tools_parser.add_argument('--verbose', action='store_true', help='Show detailed tool information') + list_tools_parser = subparsers.add_parser( + "list-tools", help="List all custom tools" + ) + list_tools_parser.add_argument( + "--verbose", action="store_true", help="Show detailed tool information" + ) # Dark web OSINT command (Robin integration) darkweb_parser = subparsers.add_parser( - 'darkweb', help='Run the Robin dark web OSINT workflow' + "darkweb", help="Run the Robin dark web OSINT workflow" + ) + darkweb_parser.add_argument( + "--query", "-q", required=True, help="Dark web search query" ) - darkweb_parser.add_argument('--query', '-q', required=True, help='Dark web search query') darkweb_parser.add_argument( - '--model', - '-m', + "--model", + "-m", choices=darkweb_module.get_robin_model_choices(), default=darkweb_module.ROBIN_DEFAULT_MODEL, - help='LLM model to use for refinement/filtering', + help="LLM model to use for refinement/filtering", ) darkweb_parser.add_argument( - '--threads', - '-t', + "--threads", + "-t", type=int, default=5, - help='Number of concurrent requests for search/scrape', + help="Number of concurrent requests for search/scrape", ) darkweb_parser.add_argument( - '--output', - '-o', - help='Optional output file or directory for the markdown report', + "--output", + "-o", + help="Optional output file or directory for the markdown report", ) args = parser.parse_args() return parser, args + async def _async_main(args): # Initialize VulnForge vf = VulnForge() @@ -532,11 +576,11 @@ async def _async_main(args): if args.uninstall: script_dir = Path(__file__).parent uninstall_script = script_dir / "uninstall_script.sh" - + if not uninstall_script.exists(): print(f"Error: Uninstall script not found at {uninstall_script}") return - + try: subprocess.run([str(uninstall_script)], check=True) except subprocess.CalledProcessError as e: @@ -548,14 +592,16 @@ async def _async_main(args): # Handle AI Pipeline Mode if args.ai_pipeline: if not args.target: - print("Error: A target is required for AI pipeline mode, e.g., --target 'scan example.com'") + print( + "Error: A target is required for AI pipeline mode, e.g., --target 'scan example.com'" + ) return - + prompt_path = Path(args.prompt_dir) if not prompt_path.exists(): print(f"Error: Prompt directory not found at '{prompt_path}'") return - + orchestrator = AIOrchestrator(prompt_path) orchestrator.execute_task(f"Perform a security scan on {args.target}") return @@ -572,8 +618,10 @@ async def _async_main(args): vf.console.print("\n[bold cyan]--- Agentic Action Plan ---[/bold cyan]") vf.console.print(json.dumps(result, indent=2)) else: - vf.console.print("\n[bold yellow]Agentic mode enabled. Ready for instructions...[/bold yellow]") - + vf.console.print( + "\n[bold yellow]Agentic mode enabled. Ready for instructions...[/bold yellow]" + ) + # If we are not in web mode, we might want an interactive CLI loop here # For now, we'll just continue to respect other flags. @@ -598,9 +646,11 @@ async def _async_main(args): # Check if Robin is available if not darkweb_module.ROBIN_AVAILABLE: print("❌ Error: Robin module dependencies not installed.") - print("Install with: pip install langchain-core langchain-openai langchain-ollama") + print( + "Install with: pip install langchain-core langchain-openai langchain-ollama" + ) return - + darkweb_module.run_darkweb_osint( args.query, model=args.model, @@ -631,7 +681,7 @@ async def _async_main(args): "Use --target to specify a domain or IP address you own or have authorization to test" ) return - + # SECURITY: Validate target input if not validate_target(args.target): print(f"Error: Invalid target format: {args.target}") @@ -654,11 +704,11 @@ async def _async_main(args): # SECURITY: Create session directory with secure permissions timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - + # SECURITY: Sanitize target for use in path safe_target = sanitize_filename(args.target) session_dir = vf.base_dir / "sessions" / safe_target / timestamp - + # SECURITY: Create with restricted permissions if not FilePermissionManager.create_secure_directory(session_dir, mode=0o700): print("Error: Failed to create secure session directory") @@ -732,41 +782,47 @@ def main(): print("❌ Error: Robin module dependencies not installed.") print("\nInstall required packages:") print(" pip install langchain-core langchain-openai langchain-ollama") - print(" pip install langchain-anthropic langchain-google-genai langchain-community") + print( + " pip install langchain-anthropic langchain-google-genai langchain-community" + ) print("\nOr install all requirements:") print(" pip install -r requirements.txt") return - + print("🌐 Launching VulnForge Web Interface...") print(f"📍 Access the UI at: http://{args.web_host}:{args.web_port}") print("⚠️ Press Ctrl+C to stop the server\n") - + try: # Import streamlit CLI from streamlit.web import cli as stcli import sys - + # Get the UI file path ui_file = Path(__file__).parent / "modules" / "web" / "dashboard.py" - + if not ui_file.exists(): print(f"❌ Error: Web UI file not found at {ui_file}") return - + # Prepare streamlit arguments sys.argv = [ "streamlit", "run", str(ui_file), - "--server.port", str(args.web_port), - "--server.address", args.web_host, - "--server.headless", "true", - "--browser.gatherUsageStats", "false" + "--server.port", + str(args.web_port), + "--server.address", + args.web_host, + "--server.headless", + "true", + "--browser.gatherUsageStats", + "false", ] - + # Launch streamlit sys.exit(stcli.main()) - + except ImportError: print("❌ Error: Streamlit is not installed.") return @@ -777,5 +833,6 @@ def main(): # Run the main async pipeline asyncio.run(_async_main(args)) + if __name__ == "__main__": main()