Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion src/ai_guardian/daemon/multi_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,51 @@ def get_violation_context(
}
return self._rest_request(target, "POST", "/api/violation-context", body)

def scan_path(
self,
target: DaemonTarget,
path: str,
) -> Optional[dict]:
"""Run a file/directory scan on the target daemon."""
if target.runtime == "local":
return self._local_scan(path)
return self._rest_request(
target, "POST", "/api/scan",
body={"path": path}, timeout=120.0,
)

@staticmethod
def _local_scan(path: str) -> dict:
"""Run scan locally via FileScanner."""
import time
from pathlib import Path as _Path
from ai_guardian.scanner import FileScanner
from ai_guardian.tui.pattern_editor import config_section_for_rule_id
from ai_guardian.web.config_helpers import load_web_config

config = load_web_config()
scanner = FileScanner(config=config)
resolved = _Path(path).resolve()

t0 = time.monotonic()
findings = scanner.scan_directory(path=str(resolved))
elapsed_ms = round((time.monotonic() - t0) * 1000)

base = resolved if resolved.is_dir() else resolved.parent
for f in findings:
f["config_section"] = config_section_for_rule_id(
f.get("rule_id", "")
)
fp = f.get("file_path", "")
if fp and not _Path(fp).is_absolute():
f["file_path"] = str(base / fp)

return {
"findings": findings,
"scanned_files": len(findings),
"scan_time_ms": elapsed_ms,
}

def get_metrics(
self,
target: DaemonTarget,
Expand Down Expand Up @@ -598,6 +643,7 @@ def _rest_request(
method: str,
path: str,
body: Optional[dict] = None,
timeout: float = REQUEST_TIMEOUT,
) -> Optional[dict]:
"""Send HTTP request to daemon REST API."""
if target.url:
Expand Down Expand Up @@ -629,7 +675,7 @@ def _rest_request(
req.add_header("Authorization", f"Bearer {target.auth_token}")

try:
with urlopen(req, timeout=REQUEST_TIMEOUT) as resp:
with urlopen(req, timeout=timeout) as resp:
return json.loads(resp.read().decode("utf-8"))
except (URLError, OSError, json.JSONDecodeError, ValueError) as e:
logger.debug("REST request failed (%s %s): %s", method, url, e)
Expand Down
60 changes: 60 additions & 0 deletions src/ai_guardian/daemon/rest_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ def do_POST(self):
if body is None:
return
self._handle_redact(body)
elif self.path == "/api/scan":
body = self._read_body()
if body is None:
return
self._handle_scan(body)
else:
self._send_error(404, "Not found")

Expand Down Expand Up @@ -428,6 +433,61 @@ def _handle_violation_context(self, body):
)
self._send_json(result)

_BLOCKED_SCAN_DIRS = frozenset([
"/etc", "/usr", "/bin", "/sbin", "/var", "/sys", "/proc", "/dev",
"/boot", "/lib", "/lib64",
])

def _handle_scan(self, body):
"""Handle POST /api/scan — file/directory security scanning."""
import time as _time
from pathlib import Path as _Path

path = body.get("path", "")
if not path:
self._send_error(400, "path is required")
return

resolved = _Path(path).resolve()
if not resolved.exists():
self._send_error(404, f"Path does not exist: {path}")
return

resolved_str = str(resolved)
for blocked in self._BLOCKED_SCAN_DIRS:
if resolved_str == blocked or resolved_str.startswith(blocked + "/"):
self._send_error(403, "Scanning system directories is not allowed")
return

try:
from ai_guardian.scanner import FileScanner
from ai_guardian.tui.pattern_editor import config_section_for_rule_id

cfg = self.server.daemon_state.get_config()
scanner = FileScanner(config=cfg)

t0 = _time.monotonic()
findings = scanner.scan_directory(path=resolved_str)
elapsed_ms = round((_time.monotonic() - t0) * 1000)

base = resolved if resolved.is_dir() else resolved.parent
for f in findings:
f["config_section"] = config_section_for_rule_id(
f.get("rule_id", "")
)
fp = f.get("file_path", "")
if fp and not _Path(fp).is_absolute():
f["file_path"] = str(base / fp)

self._send_json({
"findings": findings,
"scanned_files": len(findings),
"scan_time_ms": elapsed_ms,
})
except Exception as e:
logger.error("Scan endpoint failed: %s", e)
self._send_error(500, "Internal error during scan")

def _handle_prompt(self, body):
"""Handle POST /api/prompt — delegate ask dialog to subprocess.

Expand Down
28 changes: 21 additions & 7 deletions src/ai_guardian/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,13 @@ def __init__(
self.verbose = verbose
self.findings: List[Dict[str, Any]] = []

# Initialize scanners with feature-specific sub-configs
self.ssrf_protector = SSRFProtector(config.get("ssrf_protection", {})) if HAS_SSRF else None
# Initialize scanners with feature-specific sub-configs.
# Force action to "block" — scanner must never trigger interactive
# ask dialogs (SSRFProtector.check() shows tkinter popups when
# action is "ask").
ssrf_cfg = dict(config.get("ssrf_protection", {}))
ssrf_cfg["action"] = "block"
self.ssrf_protector = SSRFProtector(ssrf_cfg) if HAS_SSRF else None
self.unicode_detector = UnicodeAttackDetector(config.get("prompt_injection", {})) if HAS_UNICODE else None

self._image_config: Optional[Dict[str, Any]] = None
Expand Down Expand Up @@ -201,7 +206,9 @@ def scan_directory(
path: str,
include_patterns: Optional[List[str]] = None,
exclude_patterns: Optional[List[str]] = None,
config_only: bool = False
config_only: bool = False,
progress_callback=None,
cancel_event=None,
) -> List[Dict[str, Any]]:
"""
Scan directory for security issues.
Expand All @@ -211,9 +218,11 @@ def scan_directory(
include_patterns: File patterns to include (glob style)
exclude_patterns: File patterns to exclude (glob style)
config_only: Only scan AI config files
progress_callback: Optional callable(file_path, index, total)
cancel_event: Optional threading.Event — set to stop scan early

Returns:
List of findings
List of findings (partial if cancelled)
"""
self.findings = []
scan_path = Path(path).resolve()
Expand All @@ -223,13 +232,13 @@ def scan_directory(
return self.findings

if scan_path.is_file():
# Scan single file
if progress_callback:
progress_callback(str(scan_path), 1, 1)
if self._is_scannable_image(scan_path):
self._scan_image_file(scan_path, scan_path.parent)
else:
self._scan_file(scan_path, scan_path.parent)
else:
# Scan directory
files_to_scan = self._discover_files(
scan_path,
include_patterns,
Expand All @@ -240,7 +249,12 @@ def scan_directory(
if self.verbose:
print(f"Scanning {len(files_to_scan)} files...")

for file_path in files_to_scan:
total = len(files_to_scan)
for i, file_path in enumerate(files_to_scan):
if cancel_event and cancel_event.is_set():
break
if progress_callback:
progress_callback(str(file_path), i + 1, total)
if self._is_scannable_image(file_path):
self._scan_image_file(file_path, scan_path)
else:
Expand Down
3 changes: 2 additions & 1 deletion src/ai_guardian/tui/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def copy_to_system_clipboard(text: str) -> Tuple[Optional[str], Optional[str]]:
return (None, "OSC 52")
return ("Clipboard command failed", None)


# ai-guardian:begin-allow
NAV_GROUPS = [
("Security Overview", [
("Security Dashboard", "panel-security-dashboard"),
Expand Down Expand Up @@ -790,6 +790,7 @@ def copy_to_system_clipboard(text: str) -> Tuple[Optional[str], Optional[str]]:
" [bold]r[/bold] Refresh health checks"
),
}
# ai-guardian:end-allow


class HelpModal(ModalScreen):
Expand Down
Loading
Loading