Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ And with the configuration file instead:
mcpserver start -t http --port 8089 --config ./examples/jobspec/mcpserver.yaml
```

We will provide examples for jobspec translation functions in [fractale-mcp](https://github.com/compspec/fractale-mcp).
We will provide examples for jobspec translation functions in [fractale](https://github.com/compspec/fractale-mcp) and the agent in [fractale-agents](https://github.com/converged-computing/fractale-agents).

### Kubernetes (kind)

Expand Down Expand Up @@ -243,6 +243,9 @@ The mcp-server can register worker hubs, which are other MCP servers that regist
```bash
# Start a hub in one terminal
mcpserver start --hub --hub-secret potato

# Start in dual mode (not recommended for production, primarily for experiments)
mcpserver start --dual --hub-secret potato
```

In another terminal, start a worker using the token that is generated. Add some functions for fun.
Expand Down Expand Up @@ -273,7 +276,11 @@ mcpserver start --config examples/jobspec/mcpserver.yaml --join http://0.0.0.0:8
If you are doing experiments, you can bring up a hub the same way:

```bash
# Start as a standalone hub (recommended)
mcpserver start --hub --hub-secret potato

# Start in dual mode (not recommended for production or performance experiments
mcpserver start --dual --hub-secret potato
```

To mock (simulate) a worker, add `--mock`, optionally with a particular archetype (one of `hpc`, `cloud`, or `standalone`). A worker ID is suggested to make the seed reproducible.
Expand Down
6 changes: 6 additions & 0 deletions mcpserver/cli/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ def populate_start_args(start):
help="Run the hub in serial mode (ideal for experiments on single machines)",
default=False,
)
# Dual mode means "I am a hub AND a worker" - added this for dispatch experiments
hub_group.add_argument(
"--dual",
action="store_true",
help="Start as both a Hub and a Worker (registers local resources to the fleet).",
)

# Worker Registration Group
worker_group = start.add_argument_group("🐝 Worker Registration")
Expand Down
10 changes: 6 additions & 4 deletions mcpserver/cli/start.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from mcpserver.app import init_mcp
from mcpserver.cli.manager import get_manager
from mcpserver.core.config import MCPConfig
from mcpserver.core.hub import HubManager
from mcpserver.core.hub import DualHubManager, HubManager
from mcpserver.core.worker import WorkerManager
from mcpserver.logger import logger

Expand Down Expand Up @@ -64,7 +64,9 @@ def main(args, extra, **kwargs):
app = FastAPI(title="MCP Server", lifespan=mcp_app.lifespan)

# Setup Hub (parent role)
if args.hub:
if args.dual:
mcp.hub_manager = DualHubManager.from_args(mcp, args)
elif args.hub:
mcp.hub_manager = HubManager.from_args(mcp, args)

# Setup Worker (child role) - triggered by --join. We require join secret.
Expand All @@ -86,8 +88,8 @@ async def lifespan(app: FastAPI):

app = FastAPI(title="MCP Server", lifespan=lifespan)

# Bind the /register endpoint if we are a Hub
if args.hub:
# Bind the /register endpoint if we are a Hub (or Hub and Worker)
if args.hub or args.dual:
mcp.hub_manager.bind_to_app(app)

# Mount the MCP server. Note from V: we can use mount with antother FastMCP
Expand Down
143 changes: 143 additions & 0 deletions mcpserver/core/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import collections
import json
import time

import mcpserver.utils as utils
from mcpserver.logger import logger


class WorkerBase:
"""
A WorkerBase provides worker interaction functions, e.g., negotiate, status,
ask secretary. We provide it here so that a hub can use it to generate
its dual mode (acting as worker AND hub.)
"""

def jsonify_response(self, result):
"""
Ensure we get the text, and separate and parse tool calls,
which the agent will return in a verbose mode.
"""
if isinstance(result, dict):
return result
if not isinstance(result, str) and hasattr(result, "content"):
result = result.content[0].text

# Audit the tool calls (Did the agent just get lucky?)
calls = []
if "CALLS" in result:
try:
result, calls_block = result.split("CALLS")
calls = utils.format_calls(calls_block)
except:
print(f"Issue parsing calls, agent had malformed response: {result}")
pass

result = json.loads(utils.extract_code_block(result))
result["calls"] = calls
return result

def init_providers(self, mock=False):
"""
Probe the local system on startup. E.g., "we found spack, flux, etc."
These can be faux (mock) or real discovered providers
"""
# Not required unless serving a worker or hub.
from resource_secretary.apps import discover_applications
from resource_secretary.providers import discover_providers
from resource_secretary.providers.mock import discover_mock_providers

# We can use apps in mock or regular
apps = discover_applications()
logger.info("📡 Probing local system for resource providers...")
if mock:
self.catalog = discover_mock_providers(self.worker_id, choice=mock)
else:
self.catalog = discover_providers()
self.catalog.update(apps)

def register_agent_tools(self):
"""
Registers the core negotiation tools with the FastMCP instance.
"""

@self.mcp.tool(name="get_status")
async def get_status() -> dict:
"""
Returns the Level 1 Static Manifest of this cluster.
Use this to verify hardware, software providers, and site info.
"""
return {
"worker_id": self.worker_id,
"timestamp": time.time(),
"manifest": self.manifest,
}

@self.mcp.tool(name="ask_secretary")
async def ask_secretary(request: str) -> dict:
"""
Wakes up the local Secretary Agent to perform a Level 2 investigation.
Use this to ask about specific software availability, queue depth, or node health.
"""
from resource_secretary.agents.secretary import SecretaryAgent

# Flatten the catalog into a list of active provider instances
active_providers = [inst for category in self.catalog.values() for inst in category]

# Verbose mode returns a second block with CALLS
agent = SecretaryAgent(active_providers, verbose=self.verbose)
proposal = await agent.negotiate(request)
return {"worker_id": self.worker_id, "proposal": proposal}

@self.mcp.tool(name="submit")
async def receive_job(request: str) -> dict:
"""
Receive a job. Accepts a job request, invokes the local Secretary to
generate a spec, submit it, and verify the job ID.
"""
from resource_secretary.agents.secretary import SecretaryAgent

active_providers = [inst for cat in self.catalog.values() for inst in cat]

agent = SecretaryAgent(active_providers, verbose=self.verbose)
raw_result = await agent.submit(request)
try:
receipt = self.jsonify_response(raw_result)
except Exception as e:
receipt = {"status": "FAILED", "reasoning": raw_result, "error": str(e)}

return {"worker_id": self.worker_id, "receipt": receipt}

@self.mcp.tool(name="export_provider_metadata")
def export_provider_metadata() -> str:
"""
Iterates through all providers and returns their internal 'truth' state.
This tool is 'hidden' from the Secretary Agent but used by the Hub.
"""
truth_map = {}
tool_registry = collections.defaultdict(list)

# Self.catalog is a dict: {"software": [MockSpackProvider, ...]}
for category, providers in self.catalog.items():
truth_map[category] = {}
for p in providers:
# We check if the provider has the export_truth method
if hasattr(p, "export_truth"):
truth_map[category][p.name] = p.export_truth()
else:
# Fallback to standard metadata if not a mock
truth_map[category][p.name] = p.metadata

# Capture all Secretary Tools for this provider
# We can use this for simulations to assess what the agent
# should have called (vs. what it did)
manifest = p.discover_tools(tool_types=["secretary"])
for tool_name in manifest.keys():
tool_registry[category].append(f"{p.name}.{tool_name}")

metadata = {"truth": truth_map, "registry": dict(tool_registry)}

# If we have an archetype (mocking something) save it
if hasattr(p, "archetype"):
metadata["metadata"] = {"archetype": p.archetype.name}
return json.dumps(metadata, indent=2)
80 changes: 72 additions & 8 deletions mcpserver/core/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import random
import secrets
import socket
import time
from typing import Any, Dict, Optional

Expand All @@ -12,31 +13,60 @@
import mcpserver.utils as utils
from mcpserver.logger import logger

from .base import WorkerBase


class HubManager:
"""
A hub manager is a central coordinator that aggregates worker clusters,
reflects their tools, and manages federated job negotiation.
"""

def __init__(self, mcp, host: str, port: int, secret: str = None, batch=None, serial=False):
def __init__(
self,
mcp,
host: str,
port: int,
secret: str = None,
batch=None,
serial=False,
dual=False,
hub_id=None,
path="/mcp",
mock=False,
verbose: Optional[bool] = False,
):
# Probably can simplify some of this between worker and hub
self.mcp = mcp
self.host = host
self.port = port
self.path = path
self.secret = secret or secrets.token_urlsafe(32)
self.workers: Dict[str, Dict[str, Any]] = {}
self.verbose = verbose

# For use if we are also a worker.
self.worker_id = hub_id or socket.gethostname()
self.mock = mock

# Make requests to hub in batches, in serial, or in parallel
self.set_running_mode(batch, serial)
self.set_running_mode(batch, serial, dual)

# Track registered proxies to prevent ValueError on worker re-registration
self._registered_proxies = set()

self.registration_url = f"http://{host}:{port}/register"
self._print_banner()
self._register_hub_tools()

def set_running_mode(self, batch_size=None, serial=False):
@property
def url(self):
# This is running with uvicorn that serves the ssl
return f"http://{self.host}:{self.port}"

@property
def registration_url(self):
return f"{self.url}/register"

def set_running_mode(self, batch_size=None, serial=False, dual=False):
"""
Set the function to call the fleet.
If we are worried about rate limits or running experiments,
Expand All @@ -59,14 +89,18 @@ def set_running_mode(self, batch_size=None, serial=False):
self.batch_size = batch_size
self.semaphore = asyncio.Semaphore(batch_size)
self.run_on_fleet = self.run_on_fleet_batched
logger.info(f"🚦 Hub initialized with Batch Size: {batch_size}")
logger.info(f"🚦 Hub initialized with Batch Size: {batch_size} Worker mode: {dual}")

# If we are also running as a worker, add ourselves to the fleet
self.dual = dual

@classmethod
def from_args(cls, mcp, args) -> Optional["HubManager"]:
"""
Create a HubManager from CLI arguments.
"""
if not getattr(args, "hub", False):
# Running in hub or dual mode?
if not getattr(args, "hub", False) and not getattr(args, "dual", False):
return None
return cls(
mcp,
Expand All @@ -75,6 +109,11 @@ def from_args(cls, mcp, args) -> Optional["HubManager"]:
secret=args.hub_secret,
batch=args.batch,
serial=args.serial,
dual=args.dual,
mock=args.mock,
verbose=args.verbose,
# server path
path=args.path,
)

def _print_banner(self):
Expand Down Expand Up @@ -170,7 +209,7 @@ async def dispatch_job(worker_id: str, prompt: str) -> dict:

async with info["client"] as sess:
result = await sess.call_tool("submit", {"request": prompt})
return json.loads(utils.extract_code_block(result.content[0].text))
return self.jsonify_response(result)

@self.mcp.tool(name="negotiate_job")
async def negotiate_job(prompt: str) -> dict:
Expand Down Expand Up @@ -214,6 +253,7 @@ async def negotiate_handler(wid, sess):
mcp_result = await sess.call_tool("ask_secretary", {"request": prompt})
raw_text = mcp_result.content[0].text

# TODO: vsoch: add support to parse the calls here too (like dispatch)
try:
# Parse and handle potential quote issues in LLM JSON
proposal_data = json.loads(utils.extract_code_block(raw_text))
Expand Down Expand Up @@ -356,3 +396,27 @@ def _create_proxy(self, worker_id: str, tool: Tool):

except Exception as e:
logger.error(f"❌ Failed to generate dynamic proxy for {tool.name}: {e}")


class DualHubManager(WorkerBase, HubManager):
"""
Combined hub and worker base. Aka, a hub that also serves as a worker
"""

def __init__(self, *args, **kwargs):
# Calls super on the HubManager. WorkerBase has no init
super().__init__(*args, **kwargs)
self.setup_dual()
self.init_providers(kwargs.get("mock", False))

def setup_dual(self):
"""
Setup dual mode, which means adding ourselves to the fleet.
"""
hub_id = self.worker_id or socket.gethostname()
default_url = f"http://{self.host}:{self.port}{self.path}"
self.workers[hub_id] = {
"url": self.registration_url,
"client": Client(default_url),
}
self.register_agent_tools()
Loading
Loading