From 6a281e53c6e139568b7eb889bc40346261159e4f Mon Sep 17 00:00:00 2001 From: Huzaifa Sidhpurwala Date: Mon, 22 Sep 2025 12:39:58 +0400 Subject: [PATCH] First attempt to add mcp-security-server support to rapidast --- config/config-template-mcp.yaml | 22 ++++ config/schemas/6/rapidast_schema.json | 33 ++++++ configmodel/models/scanners/mcp.py | 27 +++++ scanners/mcp/mcp_none.py | 162 ++++++++++++++++++++++++++ scanners/mcp/mcp_podman.py | 8 ++ tests/scanners/mcp/test_mcp.py | 45 +++++++ 6 files changed, 297 insertions(+) create mode 100644 config/config-template-mcp.yaml create mode 100644 configmodel/models/scanners/mcp.py create mode 100644 scanners/mcp/mcp_none.py create mode 100644 scanners/mcp/mcp_podman.py create mode 100644 tests/scanners/mcp/test_mcp.py diff --git a/config/config-template-mcp.yaml b/config/config-template-mcp.yaml new file mode 100644 index 00000000..3e3435cd --- /dev/null +++ b/config/config-template-mcp.yaml @@ -0,0 +1,22 @@ +config: + configVersion: 6 + +application: + shortName: mcp-target + +scanners: + mcp: + parameters: + url: http://127.0.0.1:9001/mcp + transport: http + format: json + timeout: 30 + verbose: false + # only_health: true + # auth_type: bearer + # auth_token: "$TOKEN" + # For SSE transport example + # sse_endpoint: /sse + # transport: sse + # executable_path: mcp-scan + diff --git a/config/schemas/6/rapidast_schema.json b/config/schemas/6/rapidast_schema.json index cc5fc668..62630d5f 100644 --- a/config/schemas/6/rapidast_schema.json +++ b/config/schemas/6/rapidast_schema.json @@ -680,6 +680,39 @@ "parameters" ] } + , + { + "type": "object", + "description": "MCP Security Scanner", + "properties": { + "parameters": { + "type": "object", + "properties": { + "url": { "type": "string" }, + "transport": { "type": "string", "enum": ["auto", "http", "sse"] }, + "format": { "type": "string", "enum": ["text", "json"] }, + "timeout": { "type": "integer" }, + "verbose": { "type": "boolean" }, + "only_health": { "type": "boolean" }, + "session_id": { "type": "string" }, + "sse_endpoint": { "type": "string" }, + "auth_type": { "type": "string", "enum": ["bearer", "oauth2-client-credentials"] }, + "auth_token": { "type": "string" }, + "token_url": { "type": "string" }, + "client_id": { "type": "string" }, + "client_secret": { "type": "string" }, + "scope": { "type": "string" }, + "explain": { "type": "string" }, + "output": { "type": "string" } + }, + "required": ["url"] + }, + "executable_path": { + "type": "string" + } + }, + "required": ["parameters"] + } ] } } diff --git a/configmodel/models/scanners/mcp.py b/configmodel/models/scanners/mcp.py new file mode 100644 index 00000000..1e5e0677 --- /dev/null +++ b/configmodel/models/scanners/mcp.py @@ -0,0 +1,27 @@ +from dataclasses import dataclass +from dataclasses import field +from typing import Any +from typing import Dict + + +@dataclass +class McpConfig: + # Parameters passed to mcp-security-scanner CLI + parameters: Dict[str, Any] = field(default_factory=dict) + # Path to the mcp-scan executable (defaults to PATH lookup if just "mcp-scan") + executable_path: str = field(default="mcp-scan") + +from dataclasses import dataclass +from dataclasses import field +from typing import Any +from typing import Dict + + +@dataclass +class McpConfig: + # Parameters passed to mcp-security-scanner CLI + parameters: Dict[str, Any] = field(default_factory=dict) + # Path to the mcp-scan executable (defaults to PATH lookup if just "mcp-scan") + executable_path: str = field(default="mcp-scan") + + diff --git a/scanners/mcp/mcp_none.py b/scanners/mcp/mcp_none.py new file mode 100644 index 00000000..59fbaccc --- /dev/null +++ b/scanners/mcp/mcp_none.py @@ -0,0 +1,162 @@ +import json +import logging +import os +import shutil +import subprocess + +import dacite + +from configmodel import RapidastConfigModel +from configmodel import deep_traverse_and_replace_with_var_content +from configmodel.models.scanners.mcp import McpConfig +from scanners import RapidastScanner +from scanners import State + + +CLASSNAME = "Mcp" + + +class Mcp(RapidastScanner): + """Scanner wrapper for mcp-security-scanner CLI (HTTP transport). + + Expects `mcp-scan` to be installed (or configurable via executable_path). + Produces JSON or text reports; we copy outputs to results dir. If JSON contains + a SARIF-like run, we leave merging to RapiDAST's existing SARIF collector. + """ + + DEFAULT_OUTPUT_FILE = "mcp_scan_report.json" + + def __init__(self, config: RapidastConfigModel, ident: str = "mcp"): + super().__init__(config, ident) + self.workdir = self._create_temp_dir("workdir") + self.cfg = self._load_cfg(config, ident) + self.cli = [] + + def _load_cfg(self, config: RapidastConfigModel, ident: str) -> McpConfig: + section = config.subtree_to_dict(f"scanners.{ident}") + if section is None: + raise ValueError(f"'scanners.{ident}' section not in config") + processed = deep_traverse_and_replace_with_var_content(section) + return dacite.from_dict(data_class=McpConfig, data=processed) + + def setup(self): + if self.state != State.UNCONFIGURED: + raise RuntimeError(f"[MCP] unexpected state in setup: {self.state}") + + params = self.cfg.parameters or {} + + # Build CLI: `mcp-scan scan --url <...> [--transport http] [--format json] [--output ] ...` + executable = self.cfg.executable_path + self.output_path = os.path.join(self.workdir, self.DEFAULT_OUTPUT_FILE) + + self.cli = [ + executable, + "scan", + ] + + # map commonly used params 1:1 + flag_map = { + "url": "--url", + "transport": "--transport", + "format": "--format", + "timeout": "--timeout", + "verbose": "--verbose", + "only_health": "--only-health", + "session_id": "--session-id", + "sse_endpoint": "--sse-endpoint", + "auth_type": "--auth-type", + "auth_token": "--auth-token", + "token_url": "--token-url", + "client_id": "--client-id", + "client_secret": "--client-secret", + "scope": "--scope", + "explain": "--explain", + } + + for key, flag in flag_map.items(): + if key in params and params[key] is not None: + value = params[key] + if isinstance(value, bool): + if value: + self.cli.append(flag) + else: + self.cli.extend([flag, str(value)]) + + # default to JSON output so RapiDAST can ingest artifacts; allow override via parameters.format + if not any(p in self.cli for p in ("--format",)): + self.cli.extend(["--format", "json"]) + + # respect explicit output path if provided + if "output" in params and params["output"]: + self.output_path = params["output"] + else: + self.cli.extend(["--output", self.output_path]) + + logging.info(f"Prepared MCP scan CLI: {self.cli}") + + self.state = State.READY + + def run(self): + if self.state != State.READY: + raise RuntimeError(f"[MCP] unexpected state in run: {self.state}") + + logging.info("Running mcp-security-scanner") + try: + result = subprocess.run(self.cli, check=False) + except FileNotFoundError as exc: + logging.error( + f"MCP scanner executable not found at '{self.cfg.executable_path}'. Install mcp-security-scanner or adjust 'executable_path'" + ) + raise RuntimeError("mcp-scan not found") from exc + + if result.returncode == 0: + self.state = State.DONE + else: + logging.warning(f"mcp-scan exited with code {result.returncode}") + self.state = State.ERROR + + def postprocess(self): + if self.state != State.DONE: + raise RuntimeError("No post-processing as MCP scanning has not successfully run yet.") + + super().postprocess() + + try: + os.makedirs(self.results_dir, exist_ok=True) + # Copy output report if present + if os.path.isfile(self.output_path): + dest = os.path.join(self.results_dir, os.path.basename(self.output_path)) + shutil.copy(self.output_path, dest) + + # If the report already is SARIF, nothing to do. If JSON with a 'runs' SARIF-like, leave as-is. + # Otherwise, do minimal wrap: create an empty SARIF placeholder to not break merging. + try: + with open(dest, "r", encoding="utf-8") as f: + data = json.load(f) + if not (isinstance(data, dict) and "runs" in data): + sarif_path = os.path.join(self.results_dir, "mcp-empty.sarif.json") + with open(sarif_path, "w", encoding="utf-8") as f: + json.dump({"version": "2.1.0", "runs": []}, f) + except Exception: # pylint: disable=broad-exception-caught + logging.debug("Report is not JSON; skipping SARIF placeholder generation") + else: + logging.warning("MCP output file not found; producing empty SARIF placeholder") + sarif_path = os.path.join(self.results_dir, "mcp-empty.sarif.json") + with open(sarif_path, "w", encoding="utf-8") as f: + json.dump({"version": "2.1.0", "runs": []}, f) + + except Exception as exc: # pylint: disable=broad-exception-caught + logging.error(f"Unable to save MCP results: {exc}") + self.state = State.ERROR + + if self.state != State.ERROR: + self.state = State.PROCESSED + + def cleanup(self): + if self.state != State.PROCESSED: + raise RuntimeError(f"Unexpected state while cleaning up: PROCESSED != {self.state}") + + logging.debug(f"cleaning up: the tmp directory: {self.workdir}") + shutil.rmtree(self.workdir, ignore_errors=True) + + diff --git a/scanners/mcp/mcp_podman.py b/scanners/mcp/mcp_podman.py new file mode 100644 index 00000000..79686896 --- /dev/null +++ b/scanners/mcp/mcp_podman.py @@ -0,0 +1,8 @@ +CLASSNAME = "Mcp" + + +class Mcp: + def __init__(self, *args): + raise RuntimeError("MCP scanner is not supported with 'general.container.type=podman' config option") + + diff --git a/tests/scanners/mcp/test_mcp.py b/tests/scanners/mcp/test_mcp.py new file mode 100644 index 00000000..e6794ebf --- /dev/null +++ b/tests/scanners/mcp/test_mcp.py @@ -0,0 +1,45 @@ +import json +import os +from unittest.mock import patch + +import configmodel +import rapidast +from scanners.mcp.mcp_none import Mcp + + +def load_config(path: str): + data = rapidast.load_config(path) + return configmodel.RapidastConfigModel(data) + + +def test_mcp_cli_building(tmp_path): + cfg_path = os.path.join(os.path.dirname(__file__), "../../../config/config-template-mcp.yaml") + config = load_config(cfg_path) + m = Mcp(config=config) + m.setup() + # Must contain executable, subcommand, and url + assert m.cli[0].endswith("mcp-scan") or m.cli[0] == "mcp-scan" + assert m.cli[1] == "scan" + assert "--url" in m.cli + + +@patch("subprocess.run") +def test_mcp_run_and_postprocess(mock_run, tmp_path): + mock_run.return_value.returncode = 0 + cfg_path = os.path.join(os.path.dirname(__file__), "../../../config/config-template-mcp.yaml") + config = load_config(cfg_path) + m = Mcp(config=config) + m.setup() + m.run() + + # create a fake json output to be copied in postprocess + os.makedirs(m.workdir, exist_ok=True) + with open(os.path.join(m.workdir, m.DEFAULT_OUTPUT_FILE), "w", encoding="utf-8") as f: + json.dump({"runs": []}, f) + + m.postprocess() + + # Results dir should have the output + outputs = [p for p in os.listdir(m.results_dir) if p.endswith(".json")] + assert outputs, f"Expected output JSON in {m.results_dir}" +