Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,4 @@ replay_pid*
# Third party JAR files from Ghidra
lib/*.jar

__pycache__/
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ MCP Server + Ghidra Plugin
- Decompile and analyze binaries in Ghidra
- Automatically rename methods and data
- List methods, classes, imports, and exports
- BSim integration for function similarity matching
- Connect to BSim databases (H2, PostgreSQL)
- Query individual functions for similar matches
- Batch query all functions in a program
- View similarity scores, confidence levels, and executable metadata

# Installation

Expand Down Expand Up @@ -108,6 +113,13 @@ Another MCP client that supports multiple models on the backend is [5ire](https:
- `Ghidra/Framework/SoftwareModeling/lib/SoftwareModeling.jar`
- `Ghidra/Framework/Utility/lib/Utility.jar`
- `Ghidra/Framework/Gui/lib/Gui.jar`
- `Ghidra/Features/BSim/lib/BSim.jar lib/BSim.jar`
- `Ghidra/Features/BSim/lib/commons-dbcp2-2.9.0.jar lib/commons-dbcp2-2.9.0.jar`
- `Ghidra/Features/BSim/lib/commons-logging-1.2.jar lib/commons-logging-1.2.jar`
- `Ghidra/Features/BSim/lib/commons-pool2-2.11.1.jar lib/commons-pool2-2.11.1.jar`
- `Ghidra/Features/BSim/lib/h2-2.2.220.jar lib/h2-2.2.220.jar`
- `Ghidra/Features/BSim/lib/postgresql-42.7.6.jar lib/postgresql-42.7.6.jar`
Comment on lines +116 to +121
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The copy instructions appear to have the destination path duplicated at the end instead of showing source -> destination. The format should match the pattern used in lines 107-115 where only the destination path is shown, or clarify the copy command syntax (e.g., 'cp source dest').

Suggested change
- `Ghidra/Features/BSim/lib/BSim.jar lib/BSim.jar`
- `Ghidra/Features/BSim/lib/commons-dbcp2-2.9.0.jar lib/commons-dbcp2-2.9.0.jar`
- `Ghidra/Features/BSim/lib/commons-logging-1.2.jar lib/commons-logging-1.2.jar`
- `Ghidra/Features/BSim/lib/commons-pool2-2.11.1.jar lib/commons-pool2-2.11.1.jar`
- `Ghidra/Features/BSim/lib/h2-2.2.220.jar lib/h2-2.2.220.jar`
- `Ghidra/Features/BSim/lib/postgresql-42.7.6.jar lib/postgresql-42.7.6.jar`
- `Ghidra/Features/BSim/lib/BSim.jar`
- `Ghidra/Features/BSim/lib/commons-dbcp2-2.9.0.jar`
- `Ghidra/Features/BSim/lib/commons-logging-1.2.jar`
- `Ghidra/Features/BSim/lib/commons-pool2-2.11.1.jar`
- `Ghidra/Features/BSim/lib/h2-2.2.220.jar`
- `Ghidra/Features/BSim/lib/postgresql-42.7.6.jar`

Copilot uses AI. Check for mistakes.

2. Build with Maven by running:

`mvn clean package assembly:single`
Expand Down
188 changes: 183 additions & 5 deletions bridge_mcp_ghidra.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
from mcp.server.fastmcp import FastMCP

DEFAULT_GHIDRA_SERVER = "http://127.0.0.1:8080/"
DEFAULT_REQUEST_TIMEOUT = 5

logger = logging.getLogger(__name__)

mcp = FastMCP("ghidra-mcp")

# Initialize ghidra_server_url with default value
ghidra_server_url = DEFAULT_GHIDRA_SERVER
# Initialize ghidra_request_timeout with default value
ghidra_request_timeout = DEFAULT_REQUEST_TIMEOUT

def safe_get(endpoint: str, params: dict = None) -> list:
"""
Expand All @@ -33,7 +36,7 @@ def safe_get(endpoint: str, params: dict = None) -> list:
url = urljoin(ghidra_server_url, endpoint)

try:
response = requests.get(url, params=params, timeout=5)
response = requests.get(url, params=params, timeout=ghidra_request_timeout)
response.encoding = 'utf-8'
if response.ok:
return response.text.splitlines()
Expand All @@ -46,9 +49,10 @@ def safe_post(endpoint: str, data: dict | str) -> str:
try:
url = urljoin(ghidra_server_url, endpoint)
if isinstance(data, dict):
response = requests.post(url, data=data, timeout=5)
# BSim queries might be a bit slower, using configurable timeout
response = requests.post(url, data=data, timeout=ghidra_request_timeout)
else:
response = requests.post(url, data=data.encode("utf-8"), timeout=5)
response = requests.post(url, data=data.encode("utf-8"), timeout=ghidra_request_timeout)
response.encoding = 'utf-8'
if response.ok:
return response.text.strip()
Expand Down Expand Up @@ -287,6 +291,174 @@ def list_strings(offset: int = 0, limit: int = 2000, filter: str = None) -> list
params["filter"] = filter
return safe_get("strings", params)

@mcp.tool()
def bsim_select_database(database_path: str) -> str:
"""
Select and connect to a BSim database for function similarity matching.

Args:
database_path: Path to BSim database file (e.g., "/path/to/database.bsim")
or URL (e.g., "postgresql://host:port/dbname")

Returns:
Connection status and database information
"""
return safe_post("bsim/select_database", {"database_path": database_path})

@mcp.tool()
def bsim_query_function(
function_address: str,
max_matches: int = 10,
similarity_threshold: float = 0.7,
confidence_threshold: float = 0.0,
max_similarity: float | None = None,
max_confidence: float | None = None,
offset: int = 0,
limit: int = 100,
) -> str:
"""
Query a single function against the BSim database to find similar functions.

Args:
function_address: Address of the function to query (e.g., "0x401000")
max_matches: Maximum number of matches to return (default: 10)
similarity_threshold: Minimum similarity score (inclusive, 0.0-1.0, default: 0.7)
confidence_threshold: Minimum confidence score (inclusive, 0.0-1.0, default: 0.0)
max_similarity: Maximum similarity score (exclusive, 0.0-1.0, default: unbounded)
max_confidence: Maximum confidence score (exclusive, 0.0-1.0, default: unbounded)
offset: Pagination offset (default: 0)
limit: Maximum number of results to return (default: 100)

Returns:
List of matching functions with similarity scores and metadata
"""
data = {
"function_address": function_address,
"max_matches": str(max_matches),
"similarity_threshold": str(similarity_threshold),
"confidence_threshold": str(confidence_threshold),
"offset": str(offset),
"limit": str(limit),
}

if max_similarity is not None:
data["max_similarity"] = str(max_similarity)
if max_confidence is not None:
data["max_confidence"] = str(max_confidence)

return safe_post("bsim/query_function", data)

@mcp.tool()
def bsim_query_all_functions(
max_matches_per_function: int = 5,
similarity_threshold: float = 0.7,
confidence_threshold: float = 0.0,
max_similarity: float | None = None,
max_confidence: float | None = None,
offset: int = 0,
limit: int = 100,
) -> str:
"""
Query all functions in the current program against the BSim database.
Returns an overview of matches for all functions.

Args:
max_matches_per_function: Max matches per function (default: 5)
similarity_threshold: Minimum similarity score (inclusive, 0.0-1.0, default: 0.7)
confidence_threshold: Minimum confidence score (inclusive, 0.0-1.0, default: 0.0)
max_similarity: Maximum similarity score (exclusive, 0.0-1.0, default: unbounded)
max_confidence: Maximum confidence score (exclusive, 0.0-1.0, default: unbounded)
offset: Pagination offset (default: 0)
limit: Maximum number of results to return (default: 100)

Returns:
Summary and detailed results for all matching functions
"""
data = {
"max_matches_per_function": str(max_matches_per_function),
"similarity_threshold": str(similarity_threshold),
"confidence_threshold": str(confidence_threshold),
"offset": str(offset),
"limit": str(limit),
}

if max_similarity is not None:
data["max_similarity"] = str(max_similarity)
if max_confidence is not None:
data["max_confidence"] = str(max_confidence)

return safe_post("bsim/query_all_functions", data)

@mcp.tool()
def bsim_disconnect() -> str:
"""
Disconnect from the current BSim database.

Returns:
Disconnection status message
"""
return safe_post("bsim/disconnect", {})

@mcp.tool()
def bsim_status() -> str:
"""
Get the current BSim database connection status.

Returns:
Current connection status and database path if connected
"""
return "\n".join(safe_get("bsim/status"))

@mcp.tool()
def bsim_get_match_disassembly(
executable_path: str,
function_name: str,
function_address: str,
) -> str:
"""
Get the disassembly of a specific BSim match. This requires the matched
executable to be available in the Ghidra project.

Args:
executable_path: Path to the matched executable (from BSim match result)
function_name: Name of the matched function
function_address: Address of the matched function (e.g., "0x401000")

Returns:
Function prototype and assembly code for the matched function.
Returns an error message if the program is not found in the project.
"""
return safe_post("bsim/get_match_disassembly", {
"executable_path": executable_path,
"function_name": function_name,
"function_address": function_address,
})

@mcp.tool()
def bsim_get_match_decompile(
executable_path: str,
function_name: str,
function_address: str,
) -> str:
"""
Get the decompilation of a specific BSim match. This requires the matched
executable to be available in the Ghidra project.

Args:
executable_path: Path to the matched executable (from BSim match result)
function_name: Name of the matched function
function_address: Address of the matched function (e.g., "0x401000")

Returns:
Function prototype and decompiled C code for the matched function.
Returns an error message if the program is not found in the project.
"""
return safe_post("bsim/get_match_decompile", {
"executable_path": executable_path,
"function_name": function_name,
"function_address": function_address,
})

def main():
parser = argparse.ArgumentParser(description="MCP server for Ghidra")
parser.add_argument("--ghidra-server", type=str, default=DEFAULT_GHIDRA_SERVER,
Expand All @@ -297,13 +469,19 @@ def main():
help="Port to run MCP server on (only used for sse), default: 8081")
parser.add_argument("--transport", type=str, default="stdio", choices=["stdio", "sse"],
help="Transport protocol for MCP, default: stdio")
parser.add_argument("--ghidra-timeout", type=int, default=DEFAULT_REQUEST_TIMEOUT,
help=f"MCP requests timeout, default: {DEFAULT_REQUEST_TIMEOUT}")
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The help text should specify the unit of the timeout value (seconds) to clarify what the numeric value represents.

Suggested change
help=f"MCP requests timeout, default: {DEFAULT_REQUEST_TIMEOUT}")
help=f"MCP requests timeout in seconds, default: {DEFAULT_REQUEST_TIMEOUT} seconds")

Copilot uses AI. Check for mistakes.
args = parser.parse_args()

# Use the global variable to ensure it's properly updated
global ghidra_server_url
if args.ghidra_server:
ghidra_server_url = args.ghidra_server


global ghidra_request_timeout
if args.ghidra_timeout:
ghidra_request_timeout = args.ghidra_timeout

if args.transport == "sse":
try:
# Set up logging
Expand Down
Loading
Loading