diff --git a/__pycache__/llm-tools-nmap.cpython-313.pyc b/__pycache__/llm-tools-nmap.cpython-313.pyc new file mode 100644 index 0000000..595183d Binary files /dev/null and b/__pycache__/llm-tools-nmap.cpython-313.pyc differ diff --git a/ollama_nmap_agent.py b/ollama_nmap_agent.py new file mode 100644 index 0000000..0778e82 --- /dev/null +++ b/ollama_nmap_agent.py @@ -0,0 +1,328 @@ +#!/usr/bin/env python3 +""" +Ollama + Nmap agent (robustified). + +Features / changes from original: + - Robust import of 'llm-tools-nmap' (tries multiple plausible names). + - Robust handling of different ollama.chat return shapes (dict, list, generator). + - Reliable extraction of the first top-level JSON object from model output + using brace-balancing instead of a greedy regex. + - Better error handling when trying candidate llm-tools entrypoints. + - Minor UX and safety improvements. +""" + +import argparse +import json +import re +import shlex +import subprocess +import sys +import ipaddress +from typing import Optional, Dict, Any, Iterable +import importlib + +# Try to import ollama client (install in venv: pip install ollama) +try: + import ollama +except Exception: + print("ERROR: 'ollama' python package not installed in current environment.", file=sys.stderr) + print("Install in venv: pip install ollama", file=sys.stderr) + raise + +# Try to import llm-tools-nmap (optional). Be flexible about package name. +_llm_tools = None +_llm_candidates = ("llm_tools_nmap", "llm-tools-nmap", "llm_tools.nmap", "llm_tools_nmap_py") +for name in _llm_candidates: + try: + _llm_tools = importlib.import_module(name) + print(f"[*] Imported llm-tools module: {name}") + break + except Exception: + _llm_tools = None + +# UTIL: check if address is allowed (private/local) unless force override +def target_allowed(target: str) -> bool: + """ + Return True if target is loopback or private IP or 'localhost'. + Note: we do NOT resolve arbitrary hostnames here for safety. + """ + try: + ip = ipaddress.ip_address(target) + return ip.is_loopback or ip.is_private + except Exception: + # not an IP (likely hostname) — allow only exact localhost variants + if target.lower() in ("localhost", "127.0.0.1", "::1"): + return True + return False + +# UTIL: run nmap and return parsed summary using nmap's XML output +def run_nmap_direct(target: str, ports: Optional[str], flags: Optional[str]) -> Dict[str, Any]: + cmd = ["nmap", "-oX", "-"] + if flags: + cmd += shlex.split(flags) + if ports: + cmd += ["-p", ports] + cmd += [target] + print("[*] Running:", " ".join(shlex.quote(c) for c in cmd)) + proc = subprocess.run(cmd, capture_output=True, text=True) + if proc.returncode != 0 and proc.stdout.strip() == "": + print("[!] nmap returned non-zero exit and no XML. stderr:", proc.stderr.strip(), file=sys.stderr) + return {"error": proc.stderr.strip()} + xml = proc.stdout + try: + import xml.etree.ElementTree as ET + root = ET.fromstring(xml) + hosts = [] + for host in root.findall("host"): + addr = host.find("address") + ipaddr = addr.get("addr") if addr is not None else None + hostdict = {"ip": ipaddr, "ports": []} + ports_el = host.find("ports") + if ports_el is None: + hosts.append(hostdict) + continue + for p in ports_el.findall("port"): + portnum = p.get("portid") + protocol = p.get("protocol") + state_el = p.find("state") + state = state_el.get("state") if state_el is not None else None + service_el = p.find("service") + service = service_el.get("name") if service_el is not None else None + hostdict["ports"].append({ + "port": portnum, + "protocol": protocol, + "state": state, + "service": service + }) + hosts.append(hostdict) + return {"hosts": hosts} + except Exception as e: + return {"error": f"XML parse error: {e}", "raw_xml": xml} + + +# UTIL: try to call llm-tools-nmap if available (best-effort) +def run_llm_tools_nmap(target: str, ports: Optional[str], flags: Optional[str]): + if _llm_tools is None: + raise ImportError("llm-tools-nmap not importable") + + # candidate function names and expected parameter orders + candidates = [ + ("run_scan", ("target", "ports", "flags")), + ("nmap_scan", ("target", "ports", "flags")), + ("scan", ("target", "ports", "flags")), + ("do_scan", ("target", "ports", "flags")), + ] + last_exc = None + for name, params in candidates: + if hasattr(_llm_tools, name): + fn = getattr(_llm_tools, name) + try: + # try keyword call first + return fn(target=target, ports=ports, flags=flags) + except TypeError: + try: + # fallback to positional call + return fn(target, ports, flags) + except Exception as e: + last_exc = e + # don't abort; try next candidate + if last_exc: + raise RuntimeError(f"llm-tools-nmap imported but calls failed. Last error: {last_exc}") + raise RuntimeError("llm-tools-nmap imported but no known entrypoint found") + + +# Compose system prompt telling model to reply with strict JSON +SYSTEM_INSTRUCTION = """ +You are an assistant that must produce JSON ONLY (no extra text) describing a single action. +Valid actions: + 1) {"action":"scan","target":"","ports":"22,80","flags":"-sS -Pn"} + 2) {"action":"explain","text":"..."} (assistant should just explain, no scan) +Respond only with the JSON object. If you must ask a question, produce: + {"action":"question","text":""} +""" + +def normalize_ollama_response(resp: Any) -> str: + """ + Normalize common ollama.chat return shapes to a plain text string. + Supports: + - dicts containing 'message', 'content', or 'text' + - lists of messages + - generator/iterator of pieces (streaming) + - fallback to str(resp) + """ + # streaming generator / iterator (not a str) + if isinstance(resp, Iterable) and not isinstance(resp, (str, bytes, dict, list)): + try: + # consume and join text chunks + parts = [] + for chunk in resp: + if isinstance(chunk, dict): + if "message" in chunk and isinstance(chunk["message"], dict) and "content" in chunk["message"]: + parts.append(str(chunk["message"]["content"])) + elif "content" in chunk: + parts.append(str(chunk["content"])) + else: + parts.append(json.dumps(chunk)) + else: + parts.append(str(chunk)) + return "".join(parts) + except TypeError: + # not actually iterable + pass + + if isinstance(resp, dict): + # known shapes + if "message" in resp: + m = resp.get("message") + if isinstance(m, dict): + # content may be text or list/dict + c = m.get("content") + if isinstance(c, str): + return c + try: + return json.dumps(c) + except Exception: + return str(c) + if "content" in resp and isinstance(resp["content"], str): + return resp["content"] + if "text" in resp and isinstance(resp["text"], str): + return resp["text"] + # fallback stringify + try: + return json.dumps(resp) + except Exception: + return str(resp) + + if isinstance(resp, list): + # list of messages or strings + parts = [] + for item in resp: + if isinstance(item, dict): + parts.append(normalize_ollama_response(item)) + else: + parts.append(str(item)) + return "\n".join(parts) + + return str(resp) + +def extract_first_json(text: str) -> Optional[str]: + """ + Return the substring containing the first balanced JSON object found in `text`. + Uses simple brace-balancing to avoid greedy regex pitfalls. + """ + start = None + depth = 0 + for i, ch in enumerate(text): + if ch == "{": + if start is None: + start = i + depth += 1 + elif ch == "}": + if depth > 0: + depth -= 1 + if depth == 0 and start is not None: + return text[start:i+1] + return None + +def ask_model_for_action(user_prompt: str, model_name: str = "dolphin-llama3:8b") -> Dict[str, Any]: + messages = [ + {"role": "system", "content": SYSTEM_INSTRUCTION}, + {"role": "user", "content": user_prompt}, + ] + resp = ollama.chat(model=model_name, messages=messages) + text = normalize_ollama_response(resp) + json_text = extract_first_json(text) + if not json_text: + raise ValueError(f"Model did not return JSON. Raw model output:\n{text}") + # try parse, with a tolerant fallback to single-quote replacement + try: + return json.loads(json_text) + except Exception as e: + fixed = json_text.replace("'", '"') + try: + return json.loads(fixed) + except Exception: + raise ValueError(f"Could not parse JSON from model output. Raw:\n{json_text}\nError: {e}") + +def main(): + parser = argparse.ArgumentParser(description="Ollama-driven Nmap agent (offline).") + parser.add_argument("--model", default="dolphin-llama3:8b", help="Ollama model name") + parser.add_argument("--prompt", help="Initial prompt to the model (if omitted, interactive mode).") + parser.add_argument("--yes", action="store_true", help="Auto-confirm scans (use with care).") + parser.add_argument("--force", action="store_true", help="Allow scanning non-private targets (unsafe).") + args = parser.parse_args() + + model_name = args.model + + if args.prompt: + user_prompt = args.prompt + else: + try: + print("Enter what you want the assistant to do (examples: 'Scan 127.0.0.1 for SSH and HTTP'):") + user_prompt = input("> ").strip() + except KeyboardInterrupt: + print("\nInterrupted.") + sys.exit(0) + if not user_prompt: + print("No prompt given, exiting.") + sys.exit(0) + + try: + action = ask_model_for_action(user_prompt, model_name=model_name) + except Exception as e: + print("ERROR parsing model output:", e, file=sys.stderr) + sys.exit(1) + + print("[*] Model action:", action) + + if action.get("action") == "question": + print("[MODEL QUESTION] ", action.get("text")) + sys.exit(0) + + if action.get("action") == "explain": + print("[MODEL EXPLANATION]\n", action.get("text")) + sys.exit(0) + + if action.get("action") != "scan": + print("Unknown action from model:", action.get("action")) + sys.exit(1) + + target = action.get("target") + ports = action.get("ports") + flags = action.get("flags") + + if not target: + print("No target provided by model.") + sys.exit(1) + + if not args.force and not target_allowed(target): + print(f"Refusing to scan '{target}' because it's not localhost/private. Use --force to override.") + sys.exit(1) + + if not args.yes: + print(f"About to run Nmap on target {target} (ports={ports}, flags={flags}). Type 'yes' to continue:") + confirm = input("> ").strip().lower() + if confirm not in ("y", "yes"): + print("Aborted by user.") + sys.exit(0) + + # Try llm-tools-nmap integration first + if _llm_tools is not None: + try: + print("[*] Trying llm-tools-nmap integration...") + result = run_llm_tools_nmap(target=target, ports=ports, flags=flags) + print("[*] llm-tools-nmap result:") + print(json.dumps(result, indent=2)) + sys.exit(0) + except Exception as e: + print("[!] llm-tools-nmap integration failed or not applicable:", e, file=sys.stderr) + print("[*] Falling back to direct nmap run.") + + # Fallback: run nmap directly and parse XML + res = run_nmap_direct(target=target, ports=ports, flags=flags) + print("[*] Parsed result:") + print(json.dumps(res, indent=2)) + + +if __name__ == "__main__": + main() diff --git a/test.py b/test.py new file mode 100644 index 0000000..3ca9aa0 --- /dev/null +++ b/test.py @@ -0,0 +1,351 @@ +#!/usr/bin/env python3 +""" +Ollama + Nmap agent (robustified). + +Features / changes from original: + - Robust import of 'llm-tools-nmap' (tries multiple plausible names). + - Robust handling of different ollama.chat return shapes (dict, list, generator). + - Reliable extraction of the first top-level JSON object from model output + using brace-balancing instead of a greedy regex. + - Better error handling when trying candidate llm-tools entrypoints. + - Minor UX and safety improvements. +""" + +import argparse +import json +import re +import shlex +import subprocess +import sys +import inspect +import ipaddress +from typing import Optional, Dict, Any, Iterable +import importlib + +# Try to import ollama client (install in venv: pip install ollama) +try: + import ollama +except Exception: + print("ERROR: 'ollama' python package not installed in current environment.", file=sys.stderr) + print("Install in venv: pip install ollama", file=sys.stderr) + raise + +# Try to import llm-tools-nmap (optional). Be flexible about package name. +_llm_tools = None +_llm_candidates = ("llm_tools_nmap", "llm-tools-nmap", "llm_tools.nmap", "llm_tools_nmap_py") +for name in _llm_candidates: + try: + _llm_tools = importlib.import_module(name) + print(f"[*] Imported llm-tools module: {name}") + break + except Exception: + _llm_tools = None + +# UTIL: check if address is allowed (private/local) unless force override +def target_allowed(target: str) -> bool: + """ + Return True if target is loopback or private IP or 'localhost'. + Note: we do NOT resolve arbitrary hostnames here for safety. + """ + try: + ip = ipaddress.ip_address(target) + return ip.is_loopback or ip.is_private + except Exception: + # not an IP (likely hostname) — allow only exact localhost variants + if target.lower() in ("localhost", "127.0.0.1", "::1"): + return True + return False + +# UTIL: run nmap and return parsed summary using nmap's XML output +def run_nmap_direct(target: str, ports: Optional[str], flags: Optional[str]) -> Dict[str, Any]: + cmd = ["nmap", "-oX", "-"] + if flags: + cmd += shlex.split(flags) + if ports: + cmd += ["-p", ports] + cmd += [target] + print("[*] Running:", " ".join(shlex.quote(c) for c in cmd)) + proc = subprocess.run(cmd, capture_output=True, text=True) + if proc.returncode != 0 and proc.stdout.strip() == "": + print("[!] nmap returned non-zero exit and no XML. stderr:", proc.stderr.strip(), file=sys.stderr) + return {"error": proc.stderr.strip()} + xml = proc.stdout + try: + import xml.etree.ElementTree as ET + root = ET.fromstring(xml) + hosts = [] + for host in root.findall("host"): + addr = host.find("address") + ipaddr = addr.get("addr") if addr is not None else None + hostdict = {"ip": ipaddr, "ports": []} + ports_el = host.find("ports") + if ports_el is None: + hosts.append(hostdict) + continue + for p in ports_el.findall("port"): + portnum = p.get("portid") + protocol = p.get("protocol") + state_el = p.find("state") + state = state_el.get("state") if state_el is not None else None + service_el = p.find("service") + service = service_el.get("name") if service_el is not None else None + hostdict["ports"].append({ + "port": portnum, + "protocol": protocol, + "state": state, + "service": service + }) + hosts.append(hostdict) + return {"hosts": hosts} + except Exception as e: + return {"error": f"XML parse error: {e}", "raw_xml": xml} + + +# UTIL: try to call llm-tools-nmap if available (best-effort) +import inspect + +def run_llm_tools_nmap(target: str, ports: Optional[str], flags: Optional[str]): + if _llm_tools is None: + raise ImportError("llm-tools-nmap not importable") + + candidates = [ + ("run_scan", ("target", "ports", "flags")), + ("nmap_scan", ("target", "ports", "flags")), + ("scan", ("target", "ports", "flags")), + ("do_scan", ("target", "ports", "flags")), + ] + last_exc = None + for name, _params in candidates: + if hasattr(_llm_tools, name): + fn = getattr(_llm_tools, name) + sig = None + try: + sig = inspect.signature(fn) + except Exception: + sig = None + try: + # Build kwargs only for parameters the function accepts + kwargs = {} + if sig: + for p in sig.parameters.values(): + if p.name == "target": + kwargs["target"] = target + elif p.name == "ports": + kwargs["ports"] = ports + elif p.name == "flags": + kwargs["flags"] = flags + # If signature unknown, try keyword call (best-effort) + if kwargs: + return fn(**kwargs) + else: + # last resort: try fewer-positional args patterns + try: + return fn(target, ports, flags) + except TypeError: + try: + return fn(target, ports) + except TypeError: + return fn(target) + except Exception as e: + last_exc = e + # keep trying other entrypoints + if last_exc: + raise RuntimeError(f"llm-tools-nmap imported but calls failed. Last error: {last_exc}") + raise RuntimeError("llm-tools-nmap imported but no known entrypoint found") + + +# Compose system prompt telling model to reply with strict JSON +SYSTEM_INSTRUCTION = """ +You are an assistant that must produce JSON ONLY (no extra text) describing a single action. +Valid actions: + 1) {"action":"scan","target":"","ports":"22,80","flags":"-sS -Pn"} + 2) {"action":"explain","text":"..."} (assistant should just explain, no scan) +Respond only with the JSON object. If you must ask a question, produce: + {"action":"question","text":""} +""" + +def normalize_ollama_response(resp: Any) -> str: + """ + Normalize common ollama.chat return shapes to a plain text string. + Supports: + - dicts containing 'message', 'content', or 'text' + - lists of messages + - generator/iterator of pieces (streaming) + - fallback to str(resp) + """ + # streaming generator / iterator (not a str) + if isinstance(resp, Iterable) and not isinstance(resp, (str, bytes, dict, list)): + try: + # consume and join text chunks + parts = [] + for chunk in resp: + if isinstance(chunk, dict): + if "message" in chunk and isinstance(chunk["message"], dict) and "content" in chunk["message"]: + parts.append(str(chunk["message"]["content"])) + elif "content" in chunk: + parts.append(str(chunk["content"])) + else: + parts.append(json.dumps(chunk)) + else: + parts.append(str(chunk)) + return "".join(parts) + except TypeError: + # not actually iterable + pass + + if isinstance(resp, dict): + # known shapes + if "message" in resp: + m = resp.get("message") + if isinstance(m, dict): + # content may be text or list/dict + c = m.get("content") + if isinstance(c, str): + return c + try: + return json.dumps(c) + except Exception: + return str(c) + if "content" in resp and isinstance(resp["content"], str): + return resp["content"] + if "text" in resp and isinstance(resp["text"], str): + return resp["text"] + # fallback stringify + try: + return json.dumps(resp) + except Exception: + return str(resp) + + if isinstance(resp, list): + # list of messages or strings + parts = [] + for item in resp: + if isinstance(item, dict): + parts.append(normalize_ollama_response(item)) + else: + parts.append(str(item)) + return "\n".join(parts) + + return str(resp) + +def extract_first_json(text: str) -> Optional[str]: + """ + Return the substring containing the first balanced JSON object found in `text`. + Uses simple brace-balancing to avoid greedy regex pitfalls. + """ + start = None + depth = 0 + for i, ch in enumerate(text): + if ch == "{": + if start is None: + start = i + depth += 1 + elif ch == "}": + if depth > 0: + depth -= 1 + if depth == 0 and start is not None: + return text[start:i+1] + return None + +def ask_model_for_action(user_prompt: str, model_name: str = "dolphin-llama3:8b") -> Dict[str, Any]: + messages = [ + {"role": "system", "content": SYSTEM_INSTRUCTION}, + {"role": "user", "content": user_prompt}, + ] + resp = ollama.chat(model=model_name, messages=messages) + text = normalize_ollama_response(resp) + json_text = extract_first_json(text) + if not json_text: + raise ValueError(f"Model did not return JSON. Raw model output:\n{text}") + # try parse, with a tolerant fallback to single-quote replacement + try: + return json.loads(json_text) + except Exception as e: + fixed = json_text.replace("'", '"') + try: + return json.loads(fixed) + except Exception: + raise ValueError(f"Could not parse JSON from model output. Raw:\n{json_text}\nError: {e}") + +def main(): + parser = argparse.ArgumentParser(description="Ollama-driven Nmap agent (offline).") + parser.add_argument("--model", default="dolphin-llama3:8b", help="Ollama model name") + parser.add_argument("--prompt", help="Initial prompt to the model (if omitted, interactive mode).") + parser.add_argument("--yes", action="store_true", help="Auto-confirm scans (use with care).") + parser.add_argument("--force", action="store_true", help="Allow scanning non-private targets (unsafe).") + args = parser.parse_args() + + model_name = args.model + + if args.prompt: + user_prompt = args.prompt + else: + try: + print("Enter what you want the assistant to do (examples: 'Scan 127.0.0.1 for SSH and HTTP'):") + user_prompt = input("> ").strip() + except KeyboardInterrupt: + print("\nInterrupted.") + sys.exit(0) + if not user_prompt: + print("No prompt given, exiting.") + sys.exit(0) + + try: + action = ask_model_for_action(user_prompt, model_name=model_name) + except Exception as e: + print("ERROR parsing model output:", e, file=sys.stderr) + sys.exit(1) + + print("[*] Model action:", action) + + if action.get("action") == "question": + print("[MODEL QUESTION] ", action.get("text")) + sys.exit(0) + + if action.get("action") == "explain": + print("[MODEL EXPLANATION]\n", action.get("text")) + sys.exit(0) + + if action.get("action") != "scan": + print("Unknown action from model:", action.get("action")) + sys.exit(1) + + target = action.get("target") + ports = action.get("ports") + flags = action.get("flags") + + if not target: + print("No target provided by model.") + sys.exit(1) + + if not args.force and not target_allowed(target): + print(f"Refusing to scan '{target}' because it's not localhost/private. Use --force to override.") + sys.exit(1) + + if not args.yes: + print(f"About to run Nmap on target {target} (ports={ports}, flags={flags}). Type 'yes' to continue:") + confirm = input("> ").strip().lower() + if confirm not in ("y", "yes"): + print("Aborted by user.") + sys.exit(0) + + # Try llm-tools-nmap integration first + if _llm_tools is not None: + try: + print("[*] Trying llm-tools-nmap integration...") + result = run_llm_tools_nmap(target=target, ports=ports, flags=flags) + print("[*] llm-tools-nmap result:") + print(json.dumps(result, indent=2)) + sys.exit(0) + except Exception as e: + print("[!] llm-tools-nmap integration failed or not applicable:", e, file=sys.stderr) + print("[*] Falling back to direct nmap run.") + + # Fallback: run nmap directly and parse XML + res = run_nmap_direct(target=target, ports=ports, flags=flags) + print("[*] Parsed result:") + print(json.dumps(res, indent=2)) + + +if __name__ == "__main__": + main() diff --git a/test1.py b/test1.py new file mode 100644 index 0000000..b4f8608 --- /dev/null +++ b/test1.py @@ -0,0 +1,210 @@ +#!/usr/bin/env python3 +""" +Ollama + Nmap agent (service-version aware, robust fallback). + +Changes: + - Ensures '-sV' and '--script=mysql-info' are automatically added. + - Detects when llm-tools-nmap output lacks version info and reruns nmap locally. + - Avoids premature sys.exit() before fallback. +""" + +import argparse, json, re, shlex, subprocess, sys, inspect, ipaddress, importlib +from typing import Optional, Dict, Any, Iterable + +# Try to import ollama +try: + import ollama +except Exception: + print("ERROR: 'ollama' package not installed. Install using: pip install ollama", file=sys.stderr) + raise + +# Try to import llm-tools-nmap (optional) +_llm_tools = None +for name in ("llm_tools_nmap", "llm-tools-nmap", "llm_tools.nmap", "llm_tools_nmap_py"): + try: + _llm_tools = importlib.import_module(name) + print(f"[*] Imported llm-tools module: {name}") + break + except Exception: + pass + +# ---------- UTILITIES ---------- + +def target_allowed(target: str) -> bool: + try: + ip = ipaddress.ip_address(target) + return ip.is_loopback or ip.is_private + except Exception: + return target.lower() in ("localhost", "127.0.0.1", "::1") + +def run_nmap_direct(target: str, ports: Optional[str], flags: Optional[str]) -> Dict[str, Any]: + cmd = ["nmap", "-oX", "-"] + if flags: + cmd += shlex.split(flags) + if ports: + cmd += ["-p", ports] + cmd.append(target) + print("[*] Running direct nmap:", " ".join(cmd)) + proc = subprocess.run(cmd, capture_output=True, text=True) + if not proc.stdout.strip(): + return {"error": proc.stderr.strip()} + + import xml.etree.ElementTree as ET + try: + root = ET.fromstring(proc.stdout) + hosts = [] + for host in root.findall("host"): + addr = host.find("address") + ipaddr = addr.get("addr") if addr is not None else None + hostdict = {"ip": ipaddr, "ports": []} + for p in host.findall(".//port"): + portnum = p.get("portid") + proto = p.get("protocol") + state = p.findtext("state[@state]") + svc_el = p.find("service") + service = svc_el.get("name") if svc_el is not None else None + version = svc_el.get("version") if svc_el is not None else None + product = svc_el.get("product") if svc_el is not None else None + hostdict["ports"].append({ + "port": portnum, + "protocol": proto, + "state": state, + "service": service, + "product": product, + "version": version + }) + hosts.append(hostdict) + return {"hosts": hosts} + except Exception as e: + return {"error": str(e), "raw_xml": proc.stdout} + +def run_llm_tools_nmap(target: str, ports: Optional[str], flags: Optional[str]): + if _llm_tools is None: + raise ImportError("llm-tools-nmap not found") + + for fn_name in ("run_scan", "nmap_scan", "scan", "do_scan"): + if hasattr(_llm_tools, fn_name): + fn = getattr(_llm_tools, fn_name) + try: + return fn(target, ports, flags) + except TypeError: + try: + return fn(target, ports) + except TypeError: + return fn(target) + raise RuntimeError("llm-tools-nmap found, but no callable method worked") + +SYSTEM_INSTRUCTION = """ +You are an assistant that must produce JSON ONLY describing a single action. +Valid actions: + {"action":"scan","target":"","ports":"22,80","flags":"-sS -Pn"} +""" + +def normalize_ollama_response(resp): + if isinstance(resp, dict): + return resp.get("message", {}).get("content", "") + return str(resp) + +def extract_first_json(text: str) -> Optional[str]: + start, depth = None, 0 + for i, ch in enumerate(text): + if ch == "{": + if start is None: + start = i + depth += 1 + elif ch == "}": + depth -= 1 + if depth == 0 and start is not None: + return text[start:i+1] + return None + +def ask_model_for_action(user_prompt: str, model_name: str) -> Dict[str, Any]: + resp = ollama.chat(model=model_name, messages=[ + {"role": "system", "content": SYSTEM_INSTRUCTION}, + {"role": "user", "content": user_prompt} + ]) + txt = normalize_ollama_response(resp) + jtxt = extract_first_json(txt) + + if not jtxt: + # Try to guess a JSON-like string + jtxt = txt.strip() + + # Replace single quotes with double quotes + jtxt = jtxt.replace("'", '"') + + # Remove trailing commas + jtxt = re.sub(r',\s*}', '}', jtxt) + jtxt = re.sub(r',\s*]', ']', jtxt) + + try: + return json.loads(jtxt) + except json.JSONDecodeError as e: + print("ERROR parsing JSON from model:", e) + print("Raw text:", txt) + return {} + +def user_requested_version(user_prompt: str, action: Dict[str, Any]) -> bool: + up = (user_prompt or "").lower() + fl = (action.get("flags") or "").lower() + return any(k in up for k in ("version", "service version")) or "-sv" in fl or "-a" in fl + +# ---------- MAIN ---------- + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--model", default="dolphin-llama3:8b") + parser.add_argument("--prompt") + parser.add_argument("--yes", action="store_true") + parser.add_argument("--force", action="store_true") + args = parser.parse_args() + + user_prompt = args.prompt or input("> ").strip() + action = ask_model_for_action(user_prompt, args.model) + print("[*] Model action:", action) + + if action.get("action") != "scan": + print("Model returned non-scan action.") + return + + target, ports, flags = action.get("target"), action.get("ports"), action.get("flags", "") + if user_requested_version(user_prompt, action): + if "-sV" not in flags: + flags += " -sV" + print("[*] Added '-sV'") + if "3306" in (ports or ""): + flags += " --script=mysql-info" + print("[*] Added '--script=mysql-info'") + + if not args.force and not target_allowed(target): + print(f"[!] Refusing external scan: {target}") + return + + if _llm_tools: + try: + print("[*] Trying llm-tools-nmap integration...") + result = run_llm_tools_nmap(target, ports, flags) + if isinstance(result, str): + print("[*] llm-tools-nmap result:\n", result) + # If no version info detected, run fallback + if "version" not in result.lower() and "mysql" in result.lower(): + print("[*] No version info found, running local fallback with -sV ...") + parsed = run_nmap_direct(target, ports, flags) + print(json.dumps(parsed, indent=2)) + else: + print("[*] Done (llm-tools output).") + return + else: + print(json.dumps(result, indent=2)) + return + except Exception as e: + print("[!] llm-tools-nmap failed:", e) + + # Fallback + parsed = run_nmap_direct(target, ports, flags) + print("[*] Parsed result:") + print(json.dumps(parsed, indent=2)) + + +if __name__ == "__main__": + main()