Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

A Model Context Protocol (MCP) server for Kubernetes that enables AI assistants like Claude, Cursor, and others to interact with Kubernetes clusters through natural language.

[![Trust Score](https://archestra.ai/mcp-catalog/api/badge/quality/rohitg00/kubectl-mcp-server)](https://archestra.ai/mcp-catalog/rohitg00__kubectl-mcp-server)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![Python](https://img.shields.io/badge/python-3.9+-blue.svg)](https://www.python.org/)
[![Kubernetes](https://img.shields.io/badge/kubernetes-%23326ce5.svg?style=flat&logo=kubernetes&logoColor=white)](https://kubernetes.io/)
Expand Down
837 changes: 837 additions & 0 deletions compatible_servers 2/cursor/cursor_compatible_mcp_server.py

Large diffs are not rendered by default.

313 changes: 313 additions & 0 deletions compatible_servers 2/generic/kubectl_jsonrpc_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
#!/usr/bin/env python3
"""
Simple JSON-RPC based MCP server for kubectl operations.
This server uses standard JSON-RPC 2.0 to handle MCP protocol requests.
"""

import asyncio
import json
import os
import sys
import subprocess
import traceback
from datetime import datetime

# Create logs directory
LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "logs")
os.makedirs(LOG_DIR, exist_ok=True)

# Open debug log file
debug_file = open(os.path.join(LOG_DIR, "kubectl_jsonrpc_debug.log"), "w")

def log(message):
"""Write a log message to the debug file and stderr."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
debug_msg = f"{timestamp} - {message}"
print(f"DEBUG: {debug_msg}", file=sys.stderr)
debug_file.write(f"{debug_msg}\n")
debug_file.flush()

log(f"Starting kubectl JSON-RPC server")
log(f"Python version: {sys.version}")
log(f"Current directory: {os.getcwd()}")

# Server information
SERVER_NAME = "kubectl-mcp-tool"
SERVER_VERSION = "0.1.0"

async def read_line():
"""Read a line from stdin asynchronously."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, sys.stdin.readline)

def write_response(response):
"""Write a response to stdout."""
json_str = json.dumps(response)
log(f"Sending response: {json_str}")
print(json_str, flush=True)

def run_kubectl_command(command):
"""Run a kubectl command and return the result."""
log(f"Running kubectl command: {command}")
try:
result = subprocess.run(
command, shell=True, check=True, capture_output=True, text=True
)
return {
"command": command,
"output": result.stdout.strip(),
"success": True
}
except subprocess.CalledProcessError as e:
log(f"kubectl command failed: {e}")
return {
"command": command,
"output": e.stderr.strip(),
"success": False,
"error": str(e)
}
except Exception as e:
log(f"Error running kubectl command: {e}")
return {
"command": command,
"output": f"Error: {str(e)}",
"success": False,
"error": str(e)
}

def process_natural_language(query):
"""Process a natural language query for kubectl operations."""
log(f"Processing natural language query: {query}")

query_lower = query.lower()

if "get pods" in query_lower:
command = "kubectl get pods"
if "namespace" in query_lower:
parts = query_lower.split("namespace")
if len(parts) > 1:
namespace = parts[1].strip()
command = f"kubectl get pods -n {namespace}"
return run_kubectl_command(command)

elif "namespaces" in query_lower:
command = "kubectl get namespaces"
return run_kubectl_command(command)

elif "current context" in query_lower:
command = "kubectl config current-context"
return run_kubectl_command(command)

else:
# Try as direct kubectl command
command = f"kubectl {query}"
return run_kubectl_command(command)

async def run_server():
"""Run the JSON-RPC server for MCP."""
log("Starting server loop")

try:
while True:
log("Waiting for message")
line = await read_line()
if not line:
log("Empty line received, exiting")
break

log(f"Received: {line.strip()}")

try:
request = json.loads(line)
request_id = request.get("id", 0)
method = request.get("method")

if method == "initialize":
# MCP initialization
log("Handling initialize method")
response = {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"name": SERVER_NAME,
"version": SERVER_VERSION,
"capabilities": {
"tools": [
{
"name": "process_natural_language",
"description": "Process natural language queries for kubectl operations",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Natural language query"
}
},
"required": ["query"]
}
},
{
"name": "get_pods",
"description": "Get all pods in the specified namespace",
"parameters": {
"type": "object",
"properties": {
"namespace": {
"type": "string",
"description": "Optional namespace (default: current namespace)"
}
}
}
},
{
"name": "get_namespaces",
"description": "Get all namespaces",
"parameters": {
"type": "object",
"properties": {}
}
},
{
"name": "run_kubectl_command",
"description": "Run a raw kubectl command",
"parameters": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The kubectl command to run"
}
},
"required": ["command"]
}
}
]
}
}
}
write_response(response)

elif method == "callTool":
# Tool call handling
log("Handling callTool method")
params = request.get("params", {})
tool_name = params.get("name")
tool_params = params.get("parameters", {})

log(f"Tool call: {tool_name} with params {tool_params}")

if tool_name == "process_natural_language":
query = tool_params.get("query", "")
result = process_natural_language(query)
response = {
"jsonrpc": "2.0",
"id": request_id,
"result": result
}

elif tool_name == "get_pods":
namespace = tool_params.get("namespace", "")
cmd = f"kubectl get pods" + (f" -n {namespace}" if namespace else "")
result = run_kubectl_command(cmd)
response = {
"jsonrpc": "2.0",
"id": request_id,
"result": result
}

elif tool_name == "get_namespaces":
result = run_kubectl_command("kubectl get namespaces")
response = {
"jsonrpc": "2.0",
"id": request_id,
"result": result
}

elif tool_name == "run_kubectl_command":
cmd = tool_params.get("command", "")
result = run_kubectl_command(cmd)
response = {
"jsonrpc": "2.0",
"id": request_id,
"result": result
}

else:
response = {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32601,
"message": f"Unknown tool: {tool_name}"
}
}

write_response(response)

elif method == "shutdown":
# Handle shutdown request
log("Received shutdown request")
response = {
"jsonrpc": "2.0",
"id": request_id,
"result": None
}
write_response(response)
break

else:
# Unknown method
log(f"Unknown method: {method}")
response = {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32601,
"message": f"Method not found: {method}"
}
}
write_response(response)

except json.JSONDecodeError as e:
log(f"Failed to parse JSON: {e}")
error_response = {
"jsonrpc": "2.0",
"id": None,
"error": {
"code": -32700,
"message": f"Parse error: {str(e)}"
}
}
write_response(error_response)

except Exception as e:
log(f"Error processing request: {e}\n{traceback.format_exc()}")
error_response = {
"jsonrpc": "2.0",
"id": request.get("id") if "request" in locals() else None,
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
write_response(error_response)

except Exception as e:
log(f"Fatal error: {e}\n{traceback.format_exc()}")
finally:
log("Server shutting down")

if __name__ == "__main__":
try:
# Set env var for unbuffered output
os.environ["PYTHONUNBUFFERED"] = "1"

# Run the server
asyncio.run(run_server())
except KeyboardInterrupt:
log("Server stopped by keyboard interrupt")
except Exception as e:
log(f"Fatal error: {e}\n{traceback.format_exc()}")
finally:
debug_file.close()
Loading