diff --git a/README.md b/README.md index 8dba152..669fd1e 100644 --- a/README.md +++ b/README.md @@ -179,7 +179,7 @@ And with the configuration file instead: mcpserver start -t http --port 8089 --config ./examples/jobspec/mcpserver.yaml ``` -We will provide examples for jobspec translation functions in [fractale-mcp](https://github.com/compspec/fractale-mcp). +We will provide examples for jobspec translation functions in [fractale](https://github.com/compspec/fractale-mcp) and the agent in [fractale-agents](https://github.com/converged-computing/fractale-agents). ### Kubernetes (kind) @@ -243,6 +243,9 @@ The mcp-server can register worker hubs, which are other MCP servers that regist ```bash # Start a hub in one terminal mcpserver start --hub --hub-secret potato + +# Start in dual mode (not recommended for production, primarily for experiments) +mcpserver start --dual --hub-secret potato ``` In another terminal, start a worker using the token that is generated. Add some functions for fun. @@ -273,7 +276,11 @@ mcpserver start --config examples/jobspec/mcpserver.yaml --join http://0.0.0.0:8 If you are doing experiments, you can bring up a hub the same way: ```bash +# Start as a standalone hub (recommended) mcpserver start --hub --hub-secret potato + +# Start in dual mode (not recommended for production or performance experiments +mcpserver start --dual --hub-secret potato ``` To mock (simulate) a worker, add `--mock`, optionally with a particular archetype (one of `hpc`, `cloud`, or `standalone`). A worker ID is suggested to make the seed reproducible. diff --git a/mcpserver/cli/args.py b/mcpserver/cli/args.py index f52956e..1ea53de 100644 --- a/mcpserver/cli/args.py +++ b/mcpserver/cli/args.py @@ -79,6 +79,12 @@ def populate_start_args(start): help="Run the hub in serial mode (ideal for experiments on single machines)", default=False, ) + # Dual mode means "I am a hub AND a worker" - added this for dispatch experiments + hub_group.add_argument( + "--dual", + action="store_true", + help="Start as both a Hub and a Worker (registers local resources to the fleet).", + ) # Worker Registration Group worker_group = start.add_argument_group("🐝 Worker Registration") diff --git a/mcpserver/cli/start.py b/mcpserver/cli/start.py index 2ad0d9a..7d63b74 100644 --- a/mcpserver/cli/start.py +++ b/mcpserver/cli/start.py @@ -15,7 +15,7 @@ from mcpserver.app import init_mcp from mcpserver.cli.manager import get_manager from mcpserver.core.config import MCPConfig -from mcpserver.core.hub import HubManager +from mcpserver.core.hub import DualHubManager, HubManager from mcpserver.core.worker import WorkerManager from mcpserver.logger import logger @@ -64,7 +64,9 @@ def main(args, extra, **kwargs): app = FastAPI(title="MCP Server", lifespan=mcp_app.lifespan) # Setup Hub (parent role) - if args.hub: + if args.dual: + mcp.hub_manager = DualHubManager.from_args(mcp, args) + elif args.hub: mcp.hub_manager = HubManager.from_args(mcp, args) # Setup Worker (child role) - triggered by --join. We require join secret. @@ -86,8 +88,8 @@ async def lifespan(app: FastAPI): app = FastAPI(title="MCP Server", lifespan=lifespan) - # Bind the /register endpoint if we are a Hub - if args.hub: + # Bind the /register endpoint if we are a Hub (or Hub and Worker) + if args.hub or args.dual: mcp.hub_manager.bind_to_app(app) # Mount the MCP server. Note from V: we can use mount with antother FastMCP diff --git a/mcpserver/core/base.py b/mcpserver/core/base.py new file mode 100644 index 0000000..98547ba --- /dev/null +++ b/mcpserver/core/base.py @@ -0,0 +1,143 @@ +import collections +import json +import time + +import mcpserver.utils as utils +from mcpserver.logger import logger + + +class WorkerBase: + """ + A WorkerBase provides worker interaction functions, e.g., negotiate, status, + ask secretary. We provide it here so that a hub can use it to generate + its dual mode (acting as worker AND hub.) + """ + + def jsonify_response(self, result): + """ + Ensure we get the text, and separate and parse tool calls, + which the agent will return in a verbose mode. + """ + if isinstance(result, dict): + return result + if not isinstance(result, str) and hasattr(result, "content"): + result = result.content[0].text + + # Audit the tool calls (Did the agent just get lucky?) + calls = [] + if "CALLS" in result: + try: + result, calls_block = result.split("CALLS") + calls = utils.format_calls(calls_block) + except: + print(f"Issue parsing calls, agent had malformed response: {result}") + pass + + result = json.loads(utils.extract_code_block(result)) + result["calls"] = calls + return result + + def init_providers(self, mock=False): + """ + Probe the local system on startup. E.g., "we found spack, flux, etc." + These can be faux (mock) or real discovered providers + """ + # Not required unless serving a worker or hub. + from resource_secretary.apps import discover_applications + from resource_secretary.providers import discover_providers + from resource_secretary.providers.mock import discover_mock_providers + + # We can use apps in mock or regular + apps = discover_applications() + logger.info("📡 Probing local system for resource providers...") + if mock: + self.catalog = discover_mock_providers(self.worker_id, choice=mock) + else: + self.catalog = discover_providers() + self.catalog.update(apps) + + def register_agent_tools(self): + """ + Registers the core negotiation tools with the FastMCP instance. + """ + + @self.mcp.tool(name="get_status") + async def get_status() -> dict: + """ + Returns the Level 1 Static Manifest of this cluster. + Use this to verify hardware, software providers, and site info. + """ + return { + "worker_id": self.worker_id, + "timestamp": time.time(), + "manifest": self.manifest, + } + + @self.mcp.tool(name="ask_secretary") + async def ask_secretary(request: str) -> dict: + """ + Wakes up the local Secretary Agent to perform a Level 2 investigation. + Use this to ask about specific software availability, queue depth, or node health. + """ + from resource_secretary.agents.secretary import SecretaryAgent + + # Flatten the catalog into a list of active provider instances + active_providers = [inst for category in self.catalog.values() for inst in category] + + # Verbose mode returns a second block with CALLS + agent = SecretaryAgent(active_providers, verbose=self.verbose) + proposal = await agent.negotiate(request) + return {"worker_id": self.worker_id, "proposal": proposal} + + @self.mcp.tool(name="submit") + async def receive_job(request: str) -> dict: + """ + Receive a job. Accepts a job request, invokes the local Secretary to + generate a spec, submit it, and verify the job ID. + """ + from resource_secretary.agents.secretary import SecretaryAgent + + active_providers = [inst for cat in self.catalog.values() for inst in cat] + + agent = SecretaryAgent(active_providers, verbose=self.verbose) + raw_result = await agent.submit(request) + try: + receipt = self.jsonify_response(raw_result) + except Exception as e: + receipt = {"status": "FAILED", "reasoning": raw_result, "error": str(e)} + + return {"worker_id": self.worker_id, "receipt": receipt} + + @self.mcp.tool(name="export_provider_metadata") + def export_provider_metadata() -> str: + """ + Iterates through all providers and returns their internal 'truth' state. + This tool is 'hidden' from the Secretary Agent but used by the Hub. + """ + truth_map = {} + tool_registry = collections.defaultdict(list) + + # Self.catalog is a dict: {"software": [MockSpackProvider, ...]} + for category, providers in self.catalog.items(): + truth_map[category] = {} + for p in providers: + # We check if the provider has the export_truth method + if hasattr(p, "export_truth"): + truth_map[category][p.name] = p.export_truth() + else: + # Fallback to standard metadata if not a mock + truth_map[category][p.name] = p.metadata + + # Capture all Secretary Tools for this provider + # We can use this for simulations to assess what the agent + # should have called (vs. what it did) + manifest = p.discover_tools(tool_types=["secretary"]) + for tool_name in manifest.keys(): + tool_registry[category].append(f"{p.name}.{tool_name}") + + metadata = {"truth": truth_map, "registry": dict(tool_registry)} + + # If we have an archetype (mocking something) save it + if hasattr(p, "archetype"): + metadata["metadata"] = {"archetype": p.archetype.name} + return json.dumps(metadata, indent=2) diff --git a/mcpserver/core/hub.py b/mcpserver/core/hub.py index b52154b..2e22a8d 100644 --- a/mcpserver/core/hub.py +++ b/mcpserver/core/hub.py @@ -2,6 +2,7 @@ import json import random import secrets +import socket import time from typing import Any, Dict, Optional @@ -12,6 +13,8 @@ import mcpserver.utils as utils from mcpserver.logger import logger +from .base import WorkerBase + class HubManager: """ @@ -19,24 +22,51 @@ class HubManager: reflects their tools, and manages federated job negotiation. """ - def __init__(self, mcp, host: str, port: int, secret: str = None, batch=None, serial=False): + def __init__( + self, + mcp, + host: str, + port: int, + secret: str = None, + batch=None, + serial=False, + dual=False, + hub_id=None, + path="/mcp", + mock=False, + verbose: Optional[bool] = False, + ): + # Probably can simplify some of this between worker and hub self.mcp = mcp self.host = host self.port = port + self.path = path self.secret = secret or secrets.token_urlsafe(32) self.workers: Dict[str, Dict[str, Any]] = {} + self.verbose = verbose + + # For use if we are also a worker. + self.worker_id = hub_id or socket.gethostname() + self.mock = mock # Make requests to hub in batches, in serial, or in parallel - self.set_running_mode(batch, serial) + self.set_running_mode(batch, serial, dual) # Track registered proxies to prevent ValueError on worker re-registration self._registered_proxies = set() - - self.registration_url = f"http://{host}:{port}/register" self._print_banner() self._register_hub_tools() - def set_running_mode(self, batch_size=None, serial=False): + @property + def url(self): + # This is running with uvicorn that serves the ssl + return f"http://{self.host}:{self.port}" + + @property + def registration_url(self): + return f"{self.url}/register" + + def set_running_mode(self, batch_size=None, serial=False, dual=False): """ Set the function to call the fleet. If we are worried about rate limits or running experiments, @@ -59,14 +89,18 @@ def set_running_mode(self, batch_size=None, serial=False): self.batch_size = batch_size self.semaphore = asyncio.Semaphore(batch_size) self.run_on_fleet = self.run_on_fleet_batched - logger.info(f"🚦 Hub initialized with Batch Size: {batch_size}") + logger.info(f"🚦 Hub initialized with Batch Size: {batch_size} Worker mode: {dual}") + + # If we are also running as a worker, add ourselves to the fleet + self.dual = dual @classmethod def from_args(cls, mcp, args) -> Optional["HubManager"]: """ Create a HubManager from CLI arguments. """ - if not getattr(args, "hub", False): + # Running in hub or dual mode? + if not getattr(args, "hub", False) and not getattr(args, "dual", False): return None return cls( mcp, @@ -75,6 +109,11 @@ def from_args(cls, mcp, args) -> Optional["HubManager"]: secret=args.hub_secret, batch=args.batch, serial=args.serial, + dual=args.dual, + mock=args.mock, + verbose=args.verbose, + # server path + path=args.path, ) def _print_banner(self): @@ -170,7 +209,7 @@ async def dispatch_job(worker_id: str, prompt: str) -> dict: async with info["client"] as sess: result = await sess.call_tool("submit", {"request": prompt}) - return json.loads(utils.extract_code_block(result.content[0].text)) + return self.jsonify_response(result) @self.mcp.tool(name="negotiate_job") async def negotiate_job(prompt: str) -> dict: @@ -214,6 +253,7 @@ async def negotiate_handler(wid, sess): mcp_result = await sess.call_tool("ask_secretary", {"request": prompt}) raw_text = mcp_result.content[0].text + # TODO: vsoch: add support to parse the calls here too (like dispatch) try: # Parse and handle potential quote issues in LLM JSON proposal_data = json.loads(utils.extract_code_block(raw_text)) @@ -356,3 +396,27 @@ def _create_proxy(self, worker_id: str, tool: Tool): except Exception as e: logger.error(f"❌ Failed to generate dynamic proxy for {tool.name}: {e}") + + +class DualHubManager(WorkerBase, HubManager): + """ + Combined hub and worker base. Aka, a hub that also serves as a worker + """ + + def __init__(self, *args, **kwargs): + # Calls super on the HubManager. WorkerBase has no init + super().__init__(*args, **kwargs) + self.setup_dual() + self.init_providers(kwargs.get("mock", False)) + + def setup_dual(self): + """ + Setup dual mode, which means adding ourselves to the fleet. + """ + hub_id = self.worker_id or socket.gethostname() + default_url = f"http://{self.host}:{self.port}{self.path}" + self.workers[hub_id] = { + "url": self.registration_url, + "client": Client(default_url), + } + self.register_agent_tools() diff --git a/mcpserver/core/worker.py b/mcpserver/core/worker.py index 1363357..863152c 100644 --- a/mcpserver/core/worker.py +++ b/mcpserver/core/worker.py @@ -1,20 +1,16 @@ import asyncio -import collections -import json import socket -import time from typing import Any, Dict, Optional import httpx -from resource_secretary.providers import discover_providers -from resource_secretary.providers.mock import discover_mock_providers from rich import print -import mcpserver.utils as utils from mcpserver.logger import logger +from .base import WorkerBase -class WorkerManager: + +class WorkerManager(WorkerBase): """ A generic worker mcpserver that discovers its own capabilities and context using the resource-secretary library. @@ -49,17 +45,6 @@ def __init__( # Register MCP Tools automatically self.register_agent_tools() - def init_providers(self, mock=False): - """ - Probe the local system on startup. E.g., "we found spack, flux, etc." - These can be faux (mock) or real discovered providers - """ - logger.info("📡 Probing local system for resource providers...") - if mock: - self.catalog = discover_mock_providers(self.worker_id, choice=mock) - else: - self.catalog = discover_providers() - def show(self): """ Show providers installed and verbosity. @@ -93,92 +78,6 @@ def parse_labels(self, label_list: Optional[list]) -> dict: labels[k.strip()] = v.strip() return labels - def register_agent_tools(self): - """ - Registers the core negotiation tools with the FastMCP instance. - """ - - @self.mcp.tool(name="get_status") - async def get_status() -> dict: - """ - Returns the Level 1 Static Manifest of this cluster. - Use this to verify hardware, software providers, and site info. - """ - return { - "worker_id": self.worker_id, - "timestamp": time.time(), - "manifest": self.manifest, - } - - @self.mcp.tool(name="ask_secretary") - async def ask_secretary(request: str) -> dict: - """ - Wakes up the local Secretary Agent to perform a Level 2 investigation. - Use this to ask about specific software availability, queue depth, or node health. - """ - from resource_secretary.agents.secretary import SecretaryAgent - - # Flatten the catalog into a list of active provider instances - active_providers = [inst for category in self.catalog.values() for inst in category] - - # Verbose mode returns a second block with CALLS - agent = SecretaryAgent(active_providers, verbose=self.verbose) - proposal = await agent.negotiate(request) - return {"worker_id": self.worker_id, "proposal": proposal} - - @self.mcp.tool(name="submit") - async def receive_job(request: str) -> dict: - """ - Receive a job. Accepts a job request, invokes the local Secretary to - generate a spec, submit it, and verify the job ID. - """ - from resource_secretary.agents.secretary import SecretaryAgent - - active_providers = [inst for cat in self.catalog.values() for inst in cat] - - agent = SecretaryAgent(active_providers) - raw_result = await agent.submit(request) - try: - receipt = json.loads(utils.extract_code_block(raw_result)) - except: - receipt = {"status": "FAILED", "reasoning": raw_result} - - return {"worker_id": self.worker_id, "receipt": receipt} - - @self.mcp.tool(name="export_provider_metadata") - def export_provider_metadata() -> str: - """ - Iterates through all providers and returns their internal 'truth' state. - This tool is 'hidden' from the Secretary Agent but used by the Hub. - """ - truth_map = {} - tool_registry = collections.defaultdict(list) - - # Self.catalog is a dict: {"software": [MockSpackProvider, ...]} - for category, providers in self.catalog.items(): - truth_map[category] = {} - for p in providers: - # We check if the provider has the export_truth method - if hasattr(p, "export_truth"): - truth_map[category][p.name] = p.export_truth() - else: - # Fallback to standard metadata if not a mock - truth_map[category][p.name] = p.metadata - - # Capture all Secretary Tools for this provider - # We can use this for simulations to assess what the agent - # should have called (vs. what it did) - manifest = p.discover_tools(tool_types=["secretary"]) - for tool_name in manifest.keys(): - tool_registry[category].append(f"{p.name}.{tool_name}") - - metadata = {"truth": truth_map, "registry": dict(tool_registry)} - - # If we have an archetype (mocking something) save it - if hasattr(p, "archetype"): - metadata["metadata"] = {"archetype": p.archetype.name} - return json.dumps(metadata, indent=2) - async def run_registration(self): """ Registers the worker with the Hub. diff --git a/mcpserver/tools/system/system.py b/mcpserver/tools/system/system.py index 411e1b6..a679d70 100644 --- a/mcpserver/tools/system/system.py +++ b/mcpserver/tools/system/system.py @@ -1,13 +1,8 @@ import os import time -from typing import Any, Dict, List - -from resource_secretary.agents.backends import get_backend -from resource_secretary.agents.secretary import SecretaryAgent -from resource_secretary.providers import discover_providers +from typing import Any, Dict from mcpserver.tools.base import BaseTool -from mcpserver.tools.decorator import mcp class SystemTool(BaseTool): @@ -16,6 +11,8 @@ class SystemTool(BaseTool): """ def setup(self, manager=None): + from resource_secretary.providers import discover_providers + self.manager = manager self.catalog = discover_providers() @@ -42,6 +39,12 @@ async def ask_secretary(self, request: str, verbose=False) -> Dict[str, Any]: """ Wakes up the local Secretary Agent using the configured backend. """ + try: + from resource_secretary.agents.backends import get_backend + from resource_secretary.agents.secretary import SecretaryAgent + except ImportError: + return {"proposal": "This cluster cannot access resources.", "status": "SUCCESS"} + # Resolve the backend instance on-demand backend = get_backend( backend_type=self.backend_config["type"], diff --git a/mcpserver/utils/text.py b/mcpserver/utils/text.py index 89d824b..c2b591b 100644 --- a/mcpserver/utils/text.py +++ b/mcpserver/utils/text.py @@ -2,6 +2,9 @@ def sanitize(name: str) -> str: + """ + Sanitize worker ids and arguments for hub properties. + """ # Replace hyphens/dots with underscores clean = name.replace("-", "_").replace(".", "_") # Python identifiers cannot start with a digit @@ -10,8 +13,18 @@ def sanitize(name: str) -> str: return clean -def format_rules(rules): - return "\n".join([f"- {r}" for r in rules]) +def format_calls(calls_block): + """ + The secretary agent can return calls. We need to ensure we try + to get and parse them correctly. + """ + calls = [] + try: + calls = extract_code_block(calls_block) + return calls + except Exception as e: + print(f"Issue in format calls: {e}") + return calls_block def extract_code_block(text): diff --git a/mcpserver/version.py b/mcpserver/version.py index 2ec0cf2..19149e1 100644 --- a/mcpserver/version.py +++ b/mcpserver/version.py @@ -1,4 +1,4 @@ -__version__ = "0.0.16" +__version__ = "0.0.17" AUTHOR = "Vanessa Sochat" AUTHOR_EMAIL = "vsoch@users.noreply.github.com" NAME = "mcp-serve"