diff --git a/README.md b/README.md index cedef96..8dba152 100644 --- a/README.md +++ b/README.md @@ -19,36 +19,12 @@ The library here has the following abstractions. - **tools**: server tools, prompts, and resources - **ui**: user interface that an engine (with a main manager) uses -- **core**: shared assets, primarily the plan/step/config definitions +- **core**: shared assets, primarily the plan/step/config definitions and worker/hub hierarchy roles - **routes**: server views not related to mcp. -- **backends**: child of an engine, these are the model services (llama, openai, gemini) - **databases**: how to save results as we progress in a pipeline (currently we support sqlite and filesystem JSON) For the above, the engines, tools, ui, databases, and backends are interfaces. -### Tools - -There are different means to add tools here: - - - **internal** are discovered in `mcpserver/tools` (assist the server). - - **external modules**: externally discovered via the same mechanism. - - **external one-off**: add a specific tool, prompt, or resource to a server (suggested) - -I am suggesting a combined approach of the first and last bullet for security. E.g., when we deploy, we do not want to open a hole to add functions that are not known. In the context of a job, we likely have a specific need or use case and can select from a library. I am developing scoped tools with this aim or goal -- to be able to deploy a job and start a server within the context of the job with exactly what is needed. Here is how the module discovery works: - -```python -from mcpserver.tools.manager import ToolManager - -# Discover and register defaults -manager = ToolManager() - -# The tools vendored here are automatically discovered.. -manager.register("mcpserver.tools") - -# Register a different module -manager.register("mymodule.tools") -``` - ## Development It is recommended to open in VSCode container. Then install: @@ -250,6 +226,184 @@ export SSL_CERT_FILE=$(pwd)/certs/cert.pem ``` And you'll see the server get hit. + +## Full Architecture + +### Starting a Hub + +You'll need to install support for the associated worker and resource discovery: + +```bash +pip install mcp-serve[hub] +pip install mcp-serve[all] +``` + +The mcp-server can register worker hubs, which are other MCP servers that register to it. To start the mcpserver as a hub: + +```bash +# Start a hub in one terminal +mcpserver start --hub --hub-secret potato +``` + +In another terminal, start a worker using the token that is generated. Add some functions for fun. + +```bash +# If it wants to write batch jobs. +pip install hpc-mcp --break-system-packages +mcpserver start --config examples/jobspec/mcpserver.yaml --join http://0.0.0.0:8000 --join-secret potato --port 7777 +``` + +Note that you can also set the secret in the environemnt. + +```bash +export MCPSERVER_JOIN_SECRET=potato +mcpserver start --config examples/jobspec/mcpserver.yaml --join http://0.0.0.0:8000 --port 7777 +``` + +You can also start a mock worker. By default, we choose 40/40/20 for archetypes for hpc, cloud, and standalone. You can +also specify an archetype. + +```bash +mcpserver start --config examples/jobspec/mcpserver.yaml --join http://0.0.0.0:8000 --port 7777 --mock +mcpserver start --config examples/jobspec/mcpserver.yaml --join http://0.0.0.0:8000 --port 7777 --archetype hpc +``` + +### Mocking a Hub + +If you are doing experiments, you can bring up a hub the same way: + +```bash +mcpserver start --hub --hub-secret potato +``` + +To mock (simulate) a worker, add `--mock`, optionally with a particular archetype (one of `hpc`, `cloud`, or `standalone`). A worker ID is suggested to make the seed reproducible. + +```bash +mcpserver start --join http://0.0.0.0:8000 --port 7777 --worker-id 10 --mock --join-secret potato +mcpserver start --join http://0.0.0.0:8000 --port 7777 --worker-id 10 --mock hpc --join-secret potato +``` + +In another terminal, you can request to export the simulation "truth" - the metadata generated for the providers chosen for the archetype. + +```bash +mcpserver start --config examples/jobspec/mcpserver.yaml --join http://0.0.0.0:8000 --port 7777 --worker-id 10 +``` + +And export "truth" metadata. + +```bash +resource-ask export --output ground-truth.json +``` + +#### Manual Queries + +Test doing raw queries for status. These are manual and local queries. + +```bash +# Get listing of workers and metadata +python3 ./examples/mcp-query.py + +# Get a specific tool metadata from the worker +python3 ./examples/mcp-query.py http://localhost:7777/mcp get_status + +# Call a namespaced tool on the hub (e.g., get the status) +python3 ./examples/mcp-query.py http://localhost:8000/mcp n_781e903e4f10_get_status +``` + +You can test it without the join secret, or a wrong join secret, to see it fail. + +### Resource Secretary Client + +This is the client general interface: + +```bash +# Includes request, asking secretaries, selection, and dispatch +resource-ask negotiate "I need " + +# Includes request and asking secretaries +resource-ask satisfy "I need " + +# Includes request, asking secretaries, and selection +resource-ask select "I need " + +# The same, but from a proposals file (json dict with data.proposal for each) +resource-ask select --proposals proposals.json "I need " + +# Dispatch directly to a named cluster +resource-ask dispatch "I need " +``` + +The `resource-ask` client, which can support using a local model to run selection and other algorithms. You can also "roll your own" stuff using the server endpoints, but this library provides interfaces for doing and extending that already. + +```bash +pip install resource-secretary +``` + + +#### Negotiating a Job + +When a user has a request, it goes to the hub as a prompt. We use a prompt instead of a set of hard coded policies, because it can technically say anything. E.g., + +> I have a paper due in 3 hours and I need to run LAMMPS. Find me at least 3 nodes and minimize time to completion. My budget is X. + +If you are using gemini or openai, make sure to install the libraries. + +```bash +pip install -e .[gemini] --break-system-packages +pip install -e .[openai] --break-system-packages +``` + +For the example, I like to find spack to be discoverable. We can install to spack to see how the responses change. + +```bash +git clone --depth 1 https://github.com/spack/spack /tmp/spack +export SPACK_ROOT=/tmp/spack +flux start +``` + +And start the worker after that. Since we are running in a VSCode environment, let's asked a smaller scoped task. + +```bash +# Satisfy request +resource-ask satisfy "Can you run cowsay on one node?" + +# List selection algorihtms +resource-ask list select + +# Negotiat (sastisfy, select, and dispatch) with a selection algorithm +resource-ask negotiate "I need to run LAMMPS with 1 node." --select agentic +``` + +You'll notice that the interface suggests using "select" next. The above "negotiate" is akin to a satisfy request. We do the following: + +```console +[resource-ask] (client) --> [negotiate_job] (hub) --> [secretary_ask] (workers) --> return to hub --> [client] +``` + +A select would take this a step further, and select. + +The above is working, and the response comes back! Next I need to work on the selection algorithm and delegation. +Likely to start I'll randomly select (that will be an interface that is valid to choose) and then allow me to implement delegation. The remainder of notes are from before. + +For this to work we: + +1. Make a call to the mcp server hub to `negotiate_work` + - Negotiate work is going to prompt the secretary to send back a response with: + - a quick yes/no response that can eliminate contenders + - policy specific metrics (e.g., estimated time to start, estimated cost, performance) + - Importantly, the hub will evaluate the importance of a set of factors for the job. E.g., "this job requires good network, completed in X time, under N cost, storage does not matter." It will come up with factors and weights (importance) and an equation for the factors and not tell the secretaries its relative weights. The hub wil prepare a prompt that describes the needs, not only the importance, but provide reference for the secretary agents. E.g., "Evaluate your network where 1.0 is 100Gbps InfiniBand and 0.0 is 1Gbps Ethernet." The secretaries will then evaluate the quality of their resources toward the goal, and send back scores and reasons/justification to evaluate each variable. We can test binary (0/1), requesting specific ranges, and normalized scores (0 to 1). The hub then just needs to evaluate the returned values against its equation. This needs to be a two step process, first quantiative, and then adjustment based on qualitative. E.g., maybe a specific filesystem is given 0.5, but the secretary also notes it is undergoing a rebuild, so the hub decides to penalize it. +2. The hub sends the request to the children workers (each a different cluster) +3. Each child worker has a secretary that receives it. + - The secretary has metadata about the cluster that is discovered on startup that does not change (e.g., hardware) + - The secretary also is able to register handles to detailed discovery tools (e.g., software, you'd do for example, `spack find lammps`) + - The secretary makes a call to request state data like queue status + - The secretary also uses the discovery tools to look for the software of choice. +4. Each secretary sends back their response - quantitative scores, plus qualitative reasons. +5. Each secretary has a trust score. It is based on two things: + - The actual discovery of resources is a known truth that is always returned. What the secretary says is compared against that. + - An actual performance of a job can be evaluated against what was promised. + - A trust score can (somehow) go into a future evaluation. + ### Design Choices Here are a few design choices (subject to change, of course). I am starting with re-implementing our fractale agents with this framework. For that, instead of agents being tied to specific functions (as classes on their agent functions) we will have a flexible agent class that changes function based on a chosen prompt. It will use mcp functions, prompts, and resources. In addition: @@ -263,7 +417,16 @@ Here are a few design choices (subject to change, of course). I am starting with ## TODO -- Full operator with Flux example (Flux operator with HPC apps and jobspec translation) +- [ ] should we be reporting utilization (e.g., mock or nvidia smi) if it might just be a login node? +- [ ] write function to compare reported agent result from truth? How? +- [ ] need way to "pass forward" an error from a worker that, for example, API key not set. +- [ ] I want to have the equivalent of a satisfy endpoint, checking for the negotiate but not dispatch. +- [ ] I also want an equivalent "just submit to this cluster" endpoint. + +Idea: + +- the mcp-server worker should have a tool that generates a prompt for an agent. "Here is a request for lammps, this many nodes, and here are the resources we see (call to get_status, which also will be returned to the caller). Can we support it? Use your tools to figure it out. then the created agent should use the tools in the same server it is generated in to answer that question. The response from the agent plus the status should return to the hub. The hub can have the weighted equation to decide on a final cluster. +- TODO: ask agent which flux variables we should eliminate. ## License diff --git a/examples/jobspec/mcpserver.yaml b/examples/jobspec/mcpserver.yaml index ed9f0b4..d49ba12 100644 --- a/examples/jobspec/mcpserver.yaml +++ b/examples/jobspec/mcpserver.yaml @@ -1,7 +1,3 @@ tools: - path: flux_mcp.validate.flux_validate_jobspec - - path: flux_mcp.transformer.transform_jobspec - -prompts: - - path: flux_mcp.validate.flux_validate_jobspec_persona - - path: flux_mcp.transformer.transform_jobspec_persona \ No newline at end of file + - path: hpc_mcp.filesystem.filesystem_write_file \ No newline at end of file diff --git a/examples/mcp-query.py b/examples/mcp-query.py new file mode 100644 index 0000000..19f2077 --- /dev/null +++ b/examples/mcp-query.py @@ -0,0 +1,174 @@ +import asyncio +import json +import argparse +from fastmcp import Client +from rich.console import Console +from rich.tree import Tree +from rich.json import JSON +from rich.panel import Panel +from rich.table import Table + +console = Console() + +DEFAULT_URL = "http://localhost:8000/mcp" +DEFAULT_TOOL = "get_fleet_status" + +def render_fleet_tree(data: dict): + """ + Renders the Level 1 Hub/Worker hierarchy as a Tree. + """ + tree = Tree("🌐 [bold cyan]mcpserver Fleet[/bold cyan]") + + if not data or not isinstance(data, dict): + tree.add("[yellow]No worker data returned.[/yellow]") + return tree + + for worker_id, info in data.items(): + # Status Icon + online = info.get("online", False) + icon = "āœ…" if online else "āŒ" + color = "green" if online else "red" + + # Worker Node + worker_node = tree.add(f"{icon} [bold {color}]{worker_id}[/bold {color}]") + worker_node.add(f"[dim]Type:[/dim] [yellow]{info.get('type', 'generic')}[/yellow]") + + if online: + status_data = info.get("status", {}) + meta_node = worker_node.add("šŸ“Š [bold white]Status Snapshot[/bold white]") + + # Extract key metrics for the tree view + if isinstance(status_data, dict): + # Check for hardware summary + if "hardware" in status_data: + hw = status_data["hardware"] + cpu = hw.get("cpu", {}).get("cores", "??") + mem = hw.get("memory", {}).get("total_gb", "??") + net = hw.get("network", {}).get("interconnect", "ethernet") + meta_node.add(f"Cores: [green]{cpu}[/green] | RAM: [green]{mem}GB[/green] | Net: [blue]{net}[/blue]") + + # Check for scheduler summary + if "scheduler" in status_data: + sch = status_data["scheduler"] + free = sch.get("cores", {}).get("free", "??") + pend = sch.get("queue", {}).get("pending", 0) + meta_node.add(f"Free Cores: [bold green]{free}[/bold green] | Pending: [bold yellow]{pend}[/bold yellow]") + + # Labels + labels = info.get("labels", {}) + if labels: + labels_node = worker_node.add("šŸ·ļø [bold blue]Labels[/bold blue]") + for lk, lv in labels.items(): + labels_node.add(f"{lk}: [blue]{lv}[/blue]") + else: + worker_node.add(f"[red]Error: {info.get('error', 'Unknown failure')}[/red]") + + return tree + +def render_negotiation_results(data: dict): + """ + Renders the Level 2 'negotiate_job' output as a side-by-side comparison table. + """ + negotiation_id = data.get("negotiation_id", "Unknown") + table = Table(title=f"šŸ¤ Job Negotiation: {negotiation_id}", border_style="cyan", show_header=True, header_style="bold magenta") + + table.add_column("Cluster ID", style="bold magenta", width=15) + table.add_column("Type", style="yellow", width=12) + table.add_column("Descriptive Proposal / Reasoning", style="white") + table.add_column("Verdict", style="bold", width=15, justify="center") + + proposals = data.get("proposals", {}) + if not proposals: + return Panel("[red]No proposals were received from the fleet.[/red]") + + for wid, prop in proposals.items(): + p_type = prop.get("type", "unknown") + + if p_type == "agentic_proposal": + p_data = prop.get("data", {}) + # Extract reasoning and decision from the secretary agent's response + reasoning = p_data.get("reasoning", p_data.get("proposal_text", "No detailed reasoning provided.")) + verdict = str(p_data.get("status", p_data.get("decision", "Unknown"))).upper() + + # Color coding based on common response patterns + color = "green" if any(x in verdict.lower() for x in ["ready", "yes", "feasible", "success"]) else "yellow" + if "error" in verdict.lower() or "no" == verdict.lower(): + color = "red" + + table.add_row(wid, "šŸ¤– Secretary", reasoning, f"[{color}]{verdict}[/{color}]") + + elif p_type == "manifest_only": + reason = prop.get("reasoning", "Static manifest fallback.") + table.add_row(wid, "šŸ“œ Manifest", reason, "[dim]EVALUATING[/dim]") + + else: + msg = prop.get("message", "Connection or Tool error") + table.add_row(wid, "āŒ Error", f"[red]{msg}[/red]", "[bold red]OFFLINE[/bold red]") + + return table + +async def query_mcp(url, tool_name, prompt=None): + """ + Main query loop connecting to the FastMCP client. + """ + console.print(f"[bold blue]šŸ“” Connecting to Hub:[/bold blue] {url}") + + try: + async with Client(url) as client: + # Prepare arguments for the tool call + # 'negotiate_job' expects 'prompt' + call_args = {} + if tool_name == "negotiate_job" and prompt: + call_args = {"prompt": prompt} + + with console.status(f"[bold yellow]Calling {tool_name}...[/bold yellow]"): + result = await client.call_tool(tool_name, call_args) + + # Extract data from FastMCP content block + data = result + if hasattr(result, "content") and len(result.content) > 0: + text_content = result.content[0].text + try: + # Clean potential single quotes and parse JSON + data = json.loads(text_content.replace("'", '"')) + except: + data = text_content + + # Visual Routing based on tool called + if tool_name == "get_fleet_status" and isinstance(data, dict): + console.print("\n") + console.print(Panel(render_fleet_tree(data), border_style="cyan", expand=False)) + + elif tool_name == "negotiate_job" and isinstance(data, dict): + console.print("\n") + console.print(render_negotiation_results(data)) + # Print raw JSON in a collapsed panel for debugging/verification + console.print(Panel(JSON.from_data(data), title="Raw Negotiation Data", border_style="dim", expand=False)) + + elif isinstance(data, (dict, list)): + console.print("\n") + console.print(Panel( + JSON.from_data(data), + title=f"[bold green]Result: {tool_name}[/bold green]", + border_style="green", + expand=False + )) + else: + console.print(f"\n[bold green]Result:[/bold green] {data}") + + except Exception as e: + console.print(f"\n[bold red]āŒ Request Failed:[/bold red] {e}") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Query an MCP hub and render descriptive expressions.") + parser.add_argument("url", nargs="?", default=DEFAULT_URL, help=f"Server URL (default: {DEFAULT_URL})") + parser.add_argument("tool", nargs="?", default=DEFAULT_TOOL, help=f"Tool to call (default: {DEFAULT_TOOL})") + parser.add_argument("--prompt", help="The natural language job request for negotiate_job") + + args = parser.parse_args() + + # Interactive prompt for negotiation if not provided via CLI + if args.tool == "negotiate_job" and not args.prompt: + args.prompt = console.input("[bold yellow]Enter your job request (e.g., 'Run LAMMPS on 32 nodes'): [/bold yellow]") + + asyncio.run(query_mcp(args.url, args.tool, args.prompt)) \ No newline at end of file diff --git a/examples/negotiate/1-negotiate-job.py b/examples/negotiate/1-negotiate-job.py new file mode 100644 index 0000000..745b5a6 --- /dev/null +++ b/examples/negotiate/1-negotiate-job.py @@ -0,0 +1,39 @@ +import asyncio +import argparse +from fastmcp import Client +from rich.console import Console +from rich.panel import Panel +from rich.table import Table + +console = Console() + +async def run_negotiation(url, prompt): + console.print(f"šŸ¤ [bold cyan]Initiating Negotiation with Fleet...[/bold cyan]") + console.print(f"[dim]Request: {prompt}[/dim]\n") + + async with Client(url) as hub: + # Step 1: Negotiation. This is akin to a satisfy request + result = await hub.call_tool("negotiate_job", {"prompt": prompt}) + data = result.structured_content + + table = Table(title="Cluster Proposal Comparison", border_style="green") + table.add_column("Worker ID", style="magenta") + table.add_column("Proposal / Reasoning", style="white") + table.add_column("Verdict", justify="center") + + # We get back a set of proposals + for wid, response in data.get("proposals", {}).items(): + cluster_result = response.get("data", {}) + proposal_text = cluster_result.get("proposal", "No response.") + status = response.get("status", "UNKNOWN") + table.add_row(wid, proposal_text, f"[bold]{status}[/bold]") + + console.print(table) + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--url", default="http://localhost:8000/mcp") + parser.add_argument("prompt", help="The job description to negotiate") + args = parser.parse_args() + + asyncio.run(run_negotiation(args.url, args.prompt)) diff --git a/mcpserver/cli/__init__.py b/mcpserver/cli/__init__.py index 664bb01..c5074e7 100644 --- a/mcpserver/cli/__init__.py +++ b/mcpserver/cli/__init__.py @@ -30,7 +30,6 @@ def get_parser(): default=False, action="store_true", ) - parser.add_argument( "--quiet", dest="quiet", diff --git a/mcpserver/cli/args.py b/mcpserver/cli/args.py index ab09204..f52956e 100644 --- a/mcpserver/cli/args.py +++ b/mcpserver/cli/args.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import os +import socket default_port = os.environ.get("MCPSERVER_PORT") or 8000 default_host = os.environ.get("MCPSERVER_HOST") or "0.0.0.0" @@ -22,7 +23,7 @@ def populate_start_args(start): start.add_argument( "-t", "--transport", - default="stdio", + default="http", help="Transport to use (defaults to stdin)", choices=["stdio", "http", "sse", "streamable-http"], ) @@ -33,6 +34,9 @@ def populate_start_args(start): help="Additional tool module paths to discover from.", default=[], ) + start.add_argument( + "--event", action="append", help="Direct event stream to import.", default=[] + ) start.add_argument("--tool", action="append", help="Direct tool to import.", default=[]) start.add_argument("--resource", action="append", help="Direct resource to import.", default=[]) start.add_argument("--prompt", action="append", help="Direct prompt to import.", default=[]) @@ -50,3 +54,86 @@ def populate_start_args(start): action="store_true", default=False, ) + + # Hub Group + hub_group = start.add_argument_group("šŸ¦ž Hub Mode") + hub_group.add_argument( + "--hub", + action="store_true", + help="Start the server in Hub mode to aggregate remote workers.", + ) + hub_group.add_argument( + "--hub-secret", + default=os.environ.get("MCP_HUB_SECRET"), + help="Secret key required for workers to register. (Auto-generated if omitted)", + ) + hub_group.add_argument( + "--batch", + default=None, + type=int, + help="make requests to workers in batches of X (e.g., for experiments)", + ) + hub_group.add_argument( + "--serial", + action="store_true", + help="Run the hub in serial mode (ideal for experiments on single machines)", + default=False, + ) + + # Worker Registration Group + worker_group = start.add_argument_group("šŸ Worker Registration") + worker_group.add_argument( + "--join", help="URL of the MCP Hub to join (e.g., http://hub-host:8089)" + ) + worker_group.add_argument( + "--join-secret", + help="The registration secret provided by the Hub.", + default=os.environ.get("MCPSERVER_JOIN_SECRET"), + ) + worker_group.add_argument( + "--worker-id", + help="Unique ID for this worker. Defaults to the hostname.", + default=socket.gethostname(), + ) + worker_group.add_argument( + "--public-url", + help="The URL the Hub should use to reach this worker (e.g. http://ip:port/mcp)", + ) + worker_group.add_argument( + "--verbose", + help="Request worker to send back a second block with provider calls", + action="store_true", + default=False, + ) + worker_group.add_argument( + "--label", + action="append", + dest="labels", + help="Custom labels in key=value format (e.g., --label gpu=h100). Can be used multiple times.", + ) + + # const=True is what we get if the flag is present but no value is given + # default=False is what we get if the flag is totally absent + # THe user can also ask for an archetype (hpc, cloud, standalone) + worker_group.add_argument( + "--mock", + nargs="?", + const=True, + default=False, + help="Start a mock worker. Can optionally select hpc, cloud, or standalone", + ) + + # Agent Reasoning Group + agent_group = start.add_argument_group("🧠 Agent Reasoning") + agent_group.add_argument( + "--llm-backend", + help="LLM provider (gemini, openai). Env: RESOURCE_SECRETARY_LLM", + ) + agent_group.add_argument( + "--llm-model", + help="Specific model name. Env: RESOURCE_SECRETARY_MODEL", + ) + agent_group.add_argument( + "--llm-api-base", + help="Base URL for the API (OpenAI/Local only). Env: RESOURCE_SECRETARY_API_BASE", + ) diff --git a/mcpserver/cli/manager.py b/mcpserver/cli/manager.py index 1b29345..d85a623 100644 --- a/mcpserver/cli/manager.py +++ b/mcpserver/cli/manager.py @@ -1,46 +1,53 @@ from mcpserver.core.config import MCPConfig from mcpserver.tools.manager import ToolManager -# Discover and register defaults +# Initialize the global ToolManager instance manager = ToolManager() -manager.register() -def get_manager(mcp, cfg): +def get_manager(mcp, cfg: MCPConfig): """ - Get the common tool manager and register tools. + Initializes the ToolManager and registers all configured tools and system identity. + + Inputs: + mcp (FastMCP): The MCP server instance. + cfg (MCPConfig): The loaded server configuration. + system_type (str): Optional legacy system type identifier. """ - # Add additional module paths (custom out of tree modules) - for path in cfg.discovery: - print(f"🧐 Registering additional module: {path}") - manager.register(path) - # explicit egistration - for endpoint in register(mcp, cfg): - print(f" āœ… Registered: {endpoint.name}") + # Load fleet tools + # This automatically boots the SystemTool and any discovery modules + print(f"šŸ“” Initializing System Identity...") + manager.load_fleet_tools(mcp, include=cfg.discovery) - # Load into the manager (tools, resources, prompts) - for tool in manager.load_tools(mcp, cfg.include, cfg.exclude): - print(f" āœ… Registered: {tool.name}") + # Handle explicit registration of specific paths (Tools, Prompts, Resources) + for endpoint, emoji in register_explicit_capabilities(mcp, cfg): + print(f" {emoji} Registered: {endpoint.name}") - # Visual to show user we have ssl + # Handle SSL if cfg.server.ssl_keyfile is not None and cfg.server.ssl_certfile is not None: print(f" šŸ” SSL Enabled") + return manager + -def register(mcp, cfg: MCPConfig): +def register_explicit_capabilities(mcp, cfg: MCPConfig): """ - Registers specific tools, prompts, and resources defined in the config. - Replaces the previous args-based register function. + Registers specific tools, prompts, and resources defined explicitly in the config. + + Inputs: + mcp (FastMCP): The MCP server instance. + cfg (MCPConfig): The loaded configuration object. """ - # Define which config lists map to which manager methods + # Map configuration lists to the manager's registration methods registries = [ - (cfg.tools, manager.register_tool), - (cfg.prompts, manager.register_prompt), - (cfg.resources, manager.register_resource), + (cfg.tools, manager.register_tool, "āœ…"), + (cfg.prompts, manager.register_prompt, "šŸ’¬"), + (cfg.resources, manager.register_resource, "ā›°ļø"), + (cfg.events, manager.register_event, "šŸ“”"), ] - for capability_list, register_func in registries: + for capability_list, register_func, emoji in registries: for item in capability_list: # item is a CapabilityConfig object with .path and .name - yield register_func(mcp, item.path, name=item.name) + yield register_func(mcp, item.path, name=item.name), emoji diff --git a/mcpserver/cli/start.py b/mcpserver/cli/start.py index 3e847a4..2ad0d9a 100644 --- a/mcpserver/cli/start.py +++ b/mcpserver/cli/start.py @@ -1,4 +1,7 @@ +import asyncio +import os import warnings +from contextlib import asynccontextmanager import uvicorn from fastapi import FastAPI @@ -9,15 +12,35 @@ "ignore", category=DeprecationWarning, module="uvicorn.protocols.websockets" ) - from mcpserver.app import init_mcp from mcpserver.cli.manager import get_manager from mcpserver.core.config import MCPConfig +from mcpserver.core.hub import HubManager +from mcpserver.core.worker import WorkerManager +from mcpserver.logger import logger # These are routes also served here from mcpserver.routes import * +def broadcast_llm(args): + """ + Broadcast LLM config via the environment. + + I think this is an OK way to do it. + """ + # 1. Inject Agent parameters into the environment + # These map the CLI flags we defined in populate_start_args to the library env vars + if args.llm_backend: + os.environ["RESOURCE_SECRETARY_LLM"] = args.llm_backend + + if args.llm_model: + os.environ["RESOURCE_SECRETARY_MODEL"] = args.llm_model + + if args.llm_api_base: + os.environ["RESOURCE_SECRETARY_API_BASE"] = args.llm_api_base + + def main(args, extra, **kwargs): """ Starts the MCP Gateway with the specified tools. @@ -25,10 +48,13 @@ def main(args, extra, **kwargs): """ if args.config is not None: print(f"šŸ“– Loading config from {args.config}") - cfg = MCPConfig.from_yaml(args.config) + cfg = MCPConfig.from_yaml(args.config, args) else: cfg = MCPConfig.from_args(args) + # Export LLM envars + broadcast_llm(args) + # Get the tool manager and register discovered tools mcp = init_mcp(cfg.exclude, cfg.include, args.mask_error_details) get_manager(mcp, cfg) @@ -37,6 +63,33 @@ def main(args, extra, **kwargs): mcp_app = mcp.http_app(path=cfg.server.path) app = FastAPI(title="MCP Server", lifespan=mcp_app.lifespan) + # Setup Hub (parent role) + if args.hub: + mcp.hub_manager = HubManager.from_args(mcp, args) + + # Setup Worker (child role) - triggered by --join. We require join secret. + if args.join: + if not args.join_secret: + logger.exit("A --join-secret is required to register with a hub.") + mcp.worker_manager = WorkerManager.from_args(mcp, args, cfg) + mcp_app = mcp.http_app(path=cfg.server.path) + + @asynccontextmanager + async def lifespan(app: FastAPI): + # startup logic for Worker registration + if hasattr(mcp, "worker_manager"): + asyncio.create_task(mcp.worker_manager.run_registration()) + + # Execute FastMCP's internal lifespan context + async with mcp_app.router.lifespan_context(app): + yield + + app = FastAPI(title="MCP Server", lifespan=lifespan) + + # Bind the /register endpoint if we are a Hub + if args.hub: + mcp.hub_manager.bind_to_app(app) + # Mount the MCP server. Note from V: we can use mount with antother FastMCP # mcp.run can also be replaced with mcp.run_async app.mount("/", mcp_app) diff --git a/mcpserver/core/config.py b/mcpserver/core/config.py index 5bbc483..43ddd1d 100644 --- a/mcpserver/core/config.py +++ b/mcpserver/core/config.py @@ -1,4 +1,4 @@ -from dataclasses import dataclass, field +from dataclasses import dataclass, field, fields, replace from typing import Any, Dict, List, Optional import yaml @@ -50,23 +50,33 @@ class MCPConfig: exclude: Optional[str] = None discovery: List[str] = field(default_factory=list) tools: List[Capability] = field(default_factory=list) + events: List[Capability] = field(default_factory=list) prompts: List[Capability] = field(default_factory=list) resources: List[Capability] = field(default_factory=list) @classmethod - def from_yaml(cls, path: str): + def from_yaml(cls, path: str, args=None): + args = args or {} with open(path, "r") as f: data = yaml.safe_load(f) or {} - return cls.from_dict(data) + return cls.from_dict(data, vars(args)) @classmethod - def from_dict(cls, data: Dict[str, Any]): - """Helper to recursively build dataclasses from a dictionary.""" + def from_dict(cls, data: Dict[str, Any], args=None): + """ + Helper to recursively build dataclasses from a dictionary. + """ + args = args or {} # Build ServerConfig server_data = data.get("server", {}) server_cfg = ServerConfig(**server_data) + # Command line takes precedence + field_names = {field.name for field in fields(ServerConfig)} + filtered_args = {k: v for k, v in args.items() if k in field_names} + server_cfg = replace(server_cfg, **filtered_args) + # Build Settings (Flattened in the dataclass) settings = data.get("settings", {}) @@ -81,6 +91,7 @@ def make_caps(key): exclude=settings.get("exclude"), discovery=data.get("discovery", []), tools=make_caps("tools"), + events=make_caps("events"), prompts=make_caps("prompts"), resources=make_caps("resources"), ) @@ -103,6 +114,7 @@ def from_args(cls, args): exclude=args.exclude, discovery=args.tool_module or [], tools=[Capability(path=t) for t in (args.tool or [])], + events=[Capability(path=e) for t in (args.event or [])], prompts=[Capability(path=p) for p in (args.prompt or [])], resources=[Capability(path=r) for r in (args.resource or [])], ) diff --git a/mcpserver/core/hub.py b/mcpserver/core/hub.py new file mode 100644 index 0000000..b52154b --- /dev/null +++ b/mcpserver/core/hub.py @@ -0,0 +1,358 @@ +import asyncio +import json +import random +import secrets +import time +from typing import Any, Dict, Optional + +from fastmcp import Client +from mcp.types import Tool +from rich import print + +import mcpserver.utils as utils +from mcpserver.logger import logger + + +class HubManager: + """ + A hub manager is a central coordinator that aggregates worker clusters, + reflects their tools, and manages federated job negotiation. + """ + + def __init__(self, mcp, host: str, port: int, secret: str = None, batch=None, serial=False): + self.mcp = mcp + self.host = host + self.port = port + self.secret = secret or secrets.token_urlsafe(32) + self.workers: Dict[str, Dict[str, Any]] = {} + + # Make requests to hub in batches, in serial, or in parallel + self.set_running_mode(batch, serial) + + # Track registered proxies to prevent ValueError on worker re-registration + self._registered_proxies = set() + + self.registration_url = f"http://{host}:{port}/register" + self._print_banner() + self._register_hub_tools() + + def set_running_mode(self, batch_size=None, serial=False): + """ + Set the function to call the fleet. + If we are worried about rate limits or running experiments, + we should be sure to run in small batches. + """ + # Set the fleet engine to run full parallel + self.semaphore = None + self.run_on_fleet = self.run_on_fleet_parallel + + if serial: + logger.info(f"⚔ Hub initialized in serial mode") + self.run_on_fleet = self.run_on_fleet_serial + return + + elif not batch_size or batch_size <= 0: + logger.info(f"⚔ Hub initialized in full Parallel mode") + return + + # Set the fleet engine to use the semaphore + self.batch_size = batch_size + self.semaphore = asyncio.Semaphore(batch_size) + self.run_on_fleet = self.run_on_fleet_batched + logger.info(f"🚦 Hub initialized with Batch Size: {batch_size}") + + @classmethod + def from_args(cls, mcp, args) -> Optional["HubManager"]: + """ + Create a HubManager from CLI arguments. + """ + if not getattr(args, "hub", False): + return None + return cls( + mcp, + host=args.host, + port=args.port, + secret=args.hub_secret, + batch=args.batch, + serial=args.serial, + ) + + def _print_banner(self): + """ + Print hub connection info for workers. + """ + print(f"\nšŸ›”ļø Hub Mode Active") + print(f" Master Secret: {self.secret}") + print(" Workers must use this secret to join the hub") + print(f" mcpserver start --join {self.registration_url}\n") + + async def run_on_fleet_parallel(self, action_fn) -> Dict[str, Any]: + """ + Run parallel sessions across all workers. + action_fn: An async function that takes (worker_id, session) and returns data. + """ + + async def _safe_wrapper(wid, info): + try: + async with info["client"] as sess: + return wid, await action_fn(wid, sess) + except Exception as e: + return wid, {"type": "error", "message": str(e)} + + if not self.workers: + return {} + + # Parallel execution of all worker actions + results = await asyncio.gather(*[_safe_wrapper(w, i) for w, i in self.workers.items()]) + return dict(results) + + async def run_on_fleet_serial(self, action_fn) -> Dict[str, Any]: + """ + Run sessions across all workers one by one (sequentially). + """ + results = {} + if not self.workers: + return results + + for wid, info in self.workers.items(): + try: + # await each one so we wait for worker to return + async with info["client"] as sess: + results[wid] = await action_fn(wid, sess) + except Exception as e: + results[wid] = {"type": "error", "message": str(e)} + + return results + + async def run_on_fleet_batched(self, action_fn) -> Dict[str, Any]: + """ + Execute on workers using a semaphore to stay under rate limits. + """ + + async def _safe_wrapper(wid, info): + # Wait for a spot in the semaphore + async with self.semaphore: + try: + # Add a micro-jitter (100-300ms) to prevent perfect bursts + await asyncio.sleep(random.uniform(0.1, 0.3)) + async with info["client"] as sess: + return wid, await action_fn(wid, sess) + except Exception as e: + return wid, {"type": "error", "message": str(e)} + + if not self.workers: + return {} + results = await asyncio.gather(*[_safe_wrapper(w, i) for w, i in self.workers.items()]) + return dict(results) + + def _register_hub_tools(self): + """ + Registers tools that the Hub itself provides to users/agents. + """ + + @self.mcp.tool(name="get_fleet_status") + async def get_fleet_status() -> dict: + """ + Aggregate Level 1 (Static Manifest + Basic Status) from all workers. + """ + if not self.workers: + return {"message": "No workers registered."} + return await self.fetch_all_statuses() + + @self.mcp.tool(name="dispatch_job") + async def dispatch_job(worker_id: str, prompt: str) -> dict: + """ + Directly targets a specific worker to execute a job. + """ + info = self.workers.get(worker_id) + if not info: + return {"error": f"Worker {worker_id} not found."} + + async with info["client"] as sess: + result = await sess.call_tool("submit", {"request": prompt}) + return json.loads(utils.extract_code_block(result.content[0].text)) + + @self.mcp.tool(name="negotiate_job") + async def negotiate_job(prompt: str) -> dict: + """ + Broadcast a job request to all worker Secretaries in parallel. + Wakes up local reasoning loops for Level 2 dynamic evaluation. + """ + if not self.workers: + return {"error": "No workers registered in fleet."} + return await self.broadcast_negotiation(prompt) + + @self.mcp.tool(name="export_fleet_truth") + async def export_fleet_truth() -> dict: + """ + Collects internal mock metadata (ground truth) from all workers. + Used for accuracy experiments to compare against agent findings, + but you could also use it for a real worker. + """ + if not self.workers: + return {"error": "No workers registered."} + + async def truth_handler(wid, sess): + mcp_result = await sess.call_tool("export_provider_metadata", {}) + return json.loads(mcp_result.content[0].text) + + results = await self.run_on_fleet(truth_handler) + return {"timestamp": time.time(), "ground_truth": results} + + async def broadcast_negotiation(self, prompt: str) -> dict: + """ + Uses the Fleet Engine to invoke Agentic Secretaries on all children. + """ + + async def negotiate_handler(wid, sess): + # Check for Level 2 support (Secretary Agent) + tools = await sess.list_tools() + has_secretary = any(t.name == "ask_secretary" for t in tools) + + if has_secretary: + # Invoke the Agentic Secretary + mcp_result = await sess.call_tool("ask_secretary", {"request": prompt}) + raw_text = mcp_result.content[0].text + + try: + # Parse and handle potential quote issues in LLM JSON + proposal_data = json.loads(utils.extract_code_block(raw_text)) + except: + proposal_data = {"proposal_text": raw_text} + + return { + "type": "agentic_proposal", + "data": proposal_data, + "status": utils.extract_code_block(raw_text), + } + else: + # Fallback to manifest only + mcp_result = await sess.call_tool("get_status", {}) + return { + "type": "manifest_only", + "reasoning": "Worker has no Secretary Agent. Providing static metadata.", + "data": mcp_result.content[0].text, + } + + start_time = time.time() + results = await self.run_on_fleet(negotiate_handler) + + return { + "negotiation_id": secrets.token_hex(4), + "timestamp": start_time, + "user_prompt": prompt, + "proposals": results, + } + + async def fetch_all_statuses(self) -> dict: + """ + Collect aggregate telemetry from all workers using the Fleet Engine. + """ + + async def status_handler(wid, sess): + info = self.workers[wid] + base_metadata = { + "labels": info.get("labels", {}), + "url": info["url"], + } + + mcp_result = await sess.call_tool("get_status", {}) + raw_text = mcp_result.content[0].text + try: + # Note: Preserving the replace hack from original for status data + status_data = json.loads(raw_text.replace("'", '"')) + except: + status_data = raw_text + + return { + **base_metadata, + "online": True, + "status": status_data, + } + + return await self.run_on_fleet(status_handler) + + def bind_to_app(self, app): + """ + Binds the Hub registration endpoint to the FastAPI app. + """ + from fastapi import HTTPException, Request + + @app.post("/register") + async def register(request: Request): + if not secrets.compare_digest(request.headers.get("X-MCP-Token", ""), self.secret): + raise HTTPException(status_code=403) + + data = await request.json() + wid, wurl = data["id"], data["url"] + + self.workers[wid] = { + "url": wurl, + "client": Client(wurl), + "labels": data.get("labels", {}), + } + + # Discover tools in the background + asyncio.create_task(self._reflect_child_tools(wid, wurl)) + return {"status": "success"} + + async def _reflect_child_tools(self, worker_id: str, url: str): + """ + Discover tools from the worker and create local proxies. + """ + try: + async with Client(url) as client: + tools = await client.list_tools() + for tool in tools: + self._create_proxy(worker_id, tool) + except Exception as e: + logger.error(f"Failed to reflect tools for {worker_id}: {e}") + + def _create_proxy(self, worker_id: str, tool: Tool): + """ + Dynamically creates a Hub-level tool that proxies to a specific worker. + """ + proxy_name = f"{utils.sanitize(worker_id)}_{utils.sanitize(tool.name)}" + + if proxy_name in self._registered_proxies: + print(f"šŸ›°ļø Re-discovered worker tool: [blue]{proxy_name}[/blue]") + return + + properties = tool.inputSchema.get("properties", {}) + arg_mapping = {utils.sanitize(k): k for k in properties.keys()} + arg_string = ", ".join([f"{safe_name}=None" for safe_name in arg_mapping.keys()]) + + exec_globals = { + "Client": Client, + "hub": self, + "worker_id": worker_id, + "target_tool": tool.name, + "arg_mapping": arg_mapping, + "logger": logger, + } + namespace = {} + + # The function resolves the current worker URL at call-time + func_def = ( + f"async def {proxy_name}({arg_string}):\n" + f" info = hub.workers.get(worker_id)\n" + f" if not info:\n" + f" return {{'error': f'Worker {{worker_id}} no longer registered'}}\n" + f" url = info['url']\n" + f" raw_locals = locals()\n" + f" args = {{arg_mapping[k]: raw_locals[k] for k in arg_mapping if raw_locals[k] is not None}}\n" + f" async with Client(url) as client:\n" + f" return await client.call_tool(target_tool, args)" + ) + + try: + exec(func_def, exec_globals, namespace) + proxy_func = namespace[proxy_name] + proxy_func.__doc__ = tool.description + + self.mcp.tool(name=proxy_name)(proxy_func) + self._registered_proxies.add(proxy_name) + print(f"šŸ›°ļø Discovered worker tool: [blue]{proxy_name}[/blue]") + + except Exception as e: + logger.error(f"āŒ Failed to generate dynamic proxy for {tool.name}: {e}") diff --git a/mcpserver/core/worker.py b/mcpserver/core/worker.py new file mode 100644 index 0000000..1363357 --- /dev/null +++ b/mcpserver/core/worker.py @@ -0,0 +1,225 @@ +import asyncio +import collections +import json +import socket +import time +from typing import Any, Dict, Optional + +import httpx +from resource_secretary.providers import discover_providers +from resource_secretary.providers.mock import discover_mock_providers +from rich import print + +import mcpserver.utils as utils +from mcpserver.logger import logger + + +class WorkerManager: + """ + A generic worker mcpserver that discovers its own capabilities + and context using the resource-secretary library. + """ + + def __init__( + self, + mcp, + hub_url: str, + secret: str, + worker_id: Optional[str] = None, + public_url: Optional[str] = None, + labels: Optional[list] = None, + mock: Optional[bool] = False, + verbose: Optional[bool] = False, + ): + self.mcp = mcp + self.hub_url = hub_url + self.secret = secret + self.worker_id = worker_id or socket.gethostname() + self.public_url = public_url + self.init_providers(mock) + self.verbose = verbose + self.show() + + # Static Manifest for the worker + self.manifest = self.build_manifest() + + # Note from vsoch: not sure if this will be useful / what we should use for. + self.labels = self.parse_labels(labels) + + # Register MCP Tools automatically + self.register_agent_tools() + + def init_providers(self, mock=False): + """ + Probe the local system on startup. E.g., "we found spack, flux, etc." + These can be faux (mock) or real discovered providers + """ + logger.info("šŸ“” Probing local system for resource providers...") + if mock: + self.catalog = discover_mock_providers(self.worker_id, choice=mock) + else: + self.catalog = discover_providers() + + def show(self): + """ + Show providers installed and verbosity. + """ + for category, providers in self.catalog.items(): + providers = ", ".join([p.name for p in providers]) + print(f" [purple]{category.rjust(10)}[/purple] {providers}") + if self.verbose: + logger.info(f"šŸ“¢ Running in verbose mode. Secretary negotiate will return calls block.") + print() + + def build_manifest(self) -> Dict[str, Any]: + """ + Flattens the discovered provider objects into a static JSON manifest. + """ + manifest = {} + for category, instances in self.catalog.items(): + manifest[category] = {inst.name: inst.metadata for inst in instances} + return manifest + + def parse_labels(self, label_list: Optional[list]) -> dict: + """ + Converts ['key=val', 'key2=val2'] to a dictionary. + """ + labels = {} + if not label_list: + return labels + for item in label_list: + if "=" in item: + k, v = item.split("=", 1) + labels[k.strip()] = v.strip() + return labels + + def register_agent_tools(self): + """ + Registers the core negotiation tools with the FastMCP instance. + """ + + @self.mcp.tool(name="get_status") + async def get_status() -> dict: + """ + Returns the Level 1 Static Manifest of this cluster. + Use this to verify hardware, software providers, and site info. + """ + return { + "worker_id": self.worker_id, + "timestamp": time.time(), + "manifest": self.manifest, + } + + @self.mcp.tool(name="ask_secretary") + async def ask_secretary(request: str) -> dict: + """ + Wakes up the local Secretary Agent to perform a Level 2 investigation. + Use this to ask about specific software availability, queue depth, or node health. + """ + from resource_secretary.agents.secretary import SecretaryAgent + + # Flatten the catalog into a list of active provider instances + active_providers = [inst for category in self.catalog.values() for inst in category] + + # Verbose mode returns a second block with CALLS + agent = SecretaryAgent(active_providers, verbose=self.verbose) + proposal = await agent.negotiate(request) + return {"worker_id": self.worker_id, "proposal": proposal} + + @self.mcp.tool(name="submit") + async def receive_job(request: str) -> dict: + """ + Receive a job. Accepts a job request, invokes the local Secretary to + generate a spec, submit it, and verify the job ID. + """ + from resource_secretary.agents.secretary import SecretaryAgent + + active_providers = [inst for cat in self.catalog.values() for inst in cat] + + agent = SecretaryAgent(active_providers) + raw_result = await agent.submit(request) + try: + receipt = json.loads(utils.extract_code_block(raw_result)) + except: + receipt = {"status": "FAILED", "reasoning": raw_result} + + return {"worker_id": self.worker_id, "receipt": receipt} + + @self.mcp.tool(name="export_provider_metadata") + def export_provider_metadata() -> str: + """ + Iterates through all providers and returns their internal 'truth' state. + This tool is 'hidden' from the Secretary Agent but used by the Hub. + """ + truth_map = {} + tool_registry = collections.defaultdict(list) + + # Self.catalog is a dict: {"software": [MockSpackProvider, ...]} + for category, providers in self.catalog.items(): + truth_map[category] = {} + for p in providers: + # We check if the provider has the export_truth method + if hasattr(p, "export_truth"): + truth_map[category][p.name] = p.export_truth() + else: + # Fallback to standard metadata if not a mock + truth_map[category][p.name] = p.metadata + + # Capture all Secretary Tools for this provider + # We can use this for simulations to assess what the agent + # should have called (vs. what it did) + manifest = p.discover_tools(tool_types=["secretary"]) + for tool_name in manifest.keys(): + tool_registry[category].append(f"{p.name}.{tool_name}") + + metadata = {"truth": truth_map, "registry": dict(tool_registry)} + + # If we have an archetype (mocking something) save it + if hasattr(p, "archetype"): + metadata["metadata"] = {"archetype": p.archetype.name} + return json.dumps(metadata, indent=2) + + async def run_registration(self): + """ + Registers the worker with the Hub. + Sends the Level 1 Manifest so the Hub knows exactly what resources are here. + """ + await asyncio.sleep(1) + async with httpx.AsyncClient() as client: + payload = { + "id": self.worker_id, + "url": self.public_url, + "labels": self.labels, + "manifest": self.manifest, + } + headers = {"X-MCP-Token": self.secret} + try: + res = await client.post(f"{self.hub_url}/register", json=payload, headers=headers) + res.raise_for_status() + logger.info( + f"āœ… Registered as '{self.worker_id}' with {len(self.manifest)} categories discovered." + ) + except Exception as e: + logger.error(f"āŒ Registration failed: {e}") + + @classmethod + def from_args(cls, mcp, args, cfg) -> Optional["WorkerManager"]: + """ + Factory to create a WorkerManager from CLI arguments. + """ + if not getattr(args, "join", None): + return None + + default_url = f"http://{cfg.server.host}:{cfg.server.port}{cfg.server.path}" + public_url = args.public_url or default_url + + return cls( + mcp, + mock=args.mock, + hub_url=args.join, + secret=args.join_secret, + worker_id=args.worker_id, + public_url=public_url, + labels=args.labels, + verbose=args.verbose, + ) diff --git a/mcpserver/events/__init__.py b/mcpserver/events/__init__.py new file mode 100644 index 0000000..37e003e --- /dev/null +++ b/mcpserver/events/__init__.py @@ -0,0 +1,26 @@ +from typing import Optional + +from .events import SubscriptionManager + +_event_manager: Optional[SubscriptionManager] = None + + +def get_event_manager() -> SubscriptionManager: + """ + Lazy-getter for the singleton manager. + Ensures we only have one instance handling the background tasks. + """ + global _event_manager + if _event_manager is None: + _event_manager = SubscriptionManager() + return _event_manager + + +def has_event_manager() -> bool: + """ + Check if the manager has been initialized. + """ + return _event_manager is not None + + +__all__ = ["get_event_manager", "has_event_manager"] diff --git a/mcpserver/events/events.py b/mcpserver/events/events.py new file mode 100644 index 0000000..a80d012 --- /dev/null +++ b/mcpserver/events/events.py @@ -0,0 +1,93 @@ +import asyncio +import inspect +from typing import Any, Awaitable, Callable, Dict, List + + +class SubscriptionManager: + """ + Generic Manager that proxies MCP calls to external Event Classes. + Does not require inheritance, only expects specific method signatures. + """ + + def __init__(self): + # provider_name -> Instance of the external Event Class + self._providers: Dict[str, Any] = {} + # Track which provider owns which sub_id for routing unsubscribes + self._sub_to_provider: Dict[str, str] = {} + + def register_provider(self, name: str, instance: Any): + """ + Validates and registers an external class instance. + """ + required_methods = ["get_metadata", "subscribe", "unsubscribe"] + for method in required_methods: + if not hasattr(instance, method) or not callable(getattr(instance, method)): + raise TypeError(f"Event provider '{name}' must implement '{method}'") + + self._providers[name] = instance + + def list_event_streams(self) -> List[Dict[str, Any]]: + """ + Aggregates metadata and docstrings from all registered external classes. + """ + results = [] + for name, instance in self._providers.items(): + # Get the base metadata provided by the class + meta = instance.get_metadata() + + # Sniff the docstrings from the subscribe method if metadata is thin + subscribe_method = getattr(instance, "subscribe") + doc = inspect.getdoc(subscribe_method) or meta.get("description", "") + + results.append( + { + "provider": name, + "description": doc, + "parameters": meta.get("parameters", {}), + "capabilities": meta.get("capabilities", []), + } + ) + return results + + async def subscribe( + self, + provider_name: str, + params: Dict[str, Any], + notify_fn: Callable[[str, Dict[str, Any]], Awaitable[None]], + ) -> str: + """ + Proxies the subscription request to the specific class. + """ + if provider_name not in self._providers: + raise ValueError(f"Provider '{provider_name}' not found.") + + instance = self._providers[provider_name] + + # We pass the notify_fn to the class so it can push events back + sub_id = await instance.subscribe(params, notify_fn) + + self._sub_to_provider[sub_id] = provider_name + return sub_id + + async def unsubscribe(self, sub_id: str) -> bool: + """ + Finds the owner of the sub_id and tells it to stop. + """ + provider_name = self._sub_to_provider.get(sub_id) + if not provider_name: + return False + + instance = self._providers[provider_name] + success = await instance.unsubscribe(sub_id) + + if success: + del self._sub_to_provider[sub_id] + return success + + async def cleanup_all(self): + """ + Iterates through all active sub_ids and unsubscribes them. + """ + ids = list(self._sub_to_provider.keys()) + for sub_id in ids: + await self.unsubscribe(sub_id) diff --git a/mcpserver/events/tools.py b/mcpserver/events/tools.py new file mode 100644 index 0000000..9d1c51b --- /dev/null +++ b/mcpserver/events/tools.py @@ -0,0 +1,110 @@ +from typing import Any, Dict, List + +from fastmcp import Context +from mcp.types import JSONRPCNotification + +from mcpserver.events import get_event_manager + + +# This shim solves a bug with "Multiple values for keyword argument jsonrpc error +# by hiding the jsonrpc field during the serialization. +class SilentNotification(JSONRPCNotification): + def model_dump(self, *args, **kwargs) -> dict[str, Any]: + d = super().model_dump(*args, **kwargs) + # Remove it so the SDK can add it back + d.pop("jsonrpc", None) + return d + + +async def list_event_streams() -> List[Dict[str, Any]]: + """ + Discovery tool to list all available reactive event providers and their requirements. + + Returns a list of providers (e.g., 'kubernetes', 'flux') along with: + 1. A description of the events they track. + 2. The specific 'parameters' dictionary keys required to filter events. + + Agents should call this first to understand what events can be subscribed to + and what filters (like namespace, job_name, or resource_type) are valid. + """ + return get_event_manager().list_event_streams() + + +async def subscribe(provider_name: str, params: Dict[str, Any], ctx: Context) -> Dict[str, str]: + """ + Subscribes to an asynchronous event stream from a specific provider. + + Once called, the server will start a background watcher. When relevant events occur, + the server will push 'notifications/event' messages to the agent. + + Arguments: + provider_name: The name of the provider (e.g., 'flux' or 'kubernetes') + retrieved from list_event_streams. + params: A dictionary of filters for the subscription. + Example for Flux: {"app_name": "lammps", "job_name": "run-1"} + Example for K8s: {"resource_type": "pods", "namespace": "default"} + Refer to list_event_streams for required/optional keys. + + Returns: + A dictionary containing the 'subscription_id'. This ID is required to + identify incoming notifications and to later unsubscribe. + """ + manager = get_event_manager() + + # Validation: Ensure the provider exists before attempting logic + available = [p["provider"] for p in manager.list_event_streams()] + if provider_name not in available: + return { + "error": f"Provider '{provider_name}' not found. Available: {available}", + "status": "failed", + } + + # Internal bridge to route class-level events into MCP JSON-RPC notifications + async def mcp_notify_bridge(sub_id: str, data: dict): + if ctx and hasattr(ctx, "session") and ctx.session: + notification = SilentNotification.model_construct( + method="notifications/message", + params={ + "level": "info", + "data": { + "subscription_id": sub_id, + "provider": provider_name, + "data": data, + }, + }, + ) + try: + # Now we pass the single object as required + await ctx.session.send_notification(notification) + except Exception as e: + print(f"āŒ Failed to send notification: {e}") + + try: + sub_id = await manager.subscribe(provider_name, params, mcp_notify_bridge) + return { + "subscription_id": sub_id, + "status": "subscribed", + "message": f"Successfully subscribed to {provider_name}. Watch for notifications.", + } + except Exception as e: + return {"error": str(e), "status": "failed"} + + +async def unsubscribe(subscription_id: str) -> Dict[str, Any]: + """ + Terminates an active event subscription and stops the background watcher. + + Arguments: + subscription_id: The unique ID returned during the initial 'subscribe' call. + + Returns: + A status message indicating if the subscription was successfully closed. + """ + success = await get_event_manager().unsubscribe(subscription_id) + if success: + return {"status": "success", "message": f"Subscription {subscription_id} has been closed."} + else: + return { + "status": "error", + "message": f"Subscription {subscription_id} not found or already closed.", + } diff --git a/mcpserver/tools/base.py b/mcpserver/tools/base.py index 2233206..0bb26a3 100644 --- a/mcpserver/tools/base.py +++ b/mcpserver/tools/base.py @@ -9,8 +9,15 @@ class BaseTool(ABC): Each tool can provision prompts, resources, or tools. """ - def setup(self): - pass + def setup(self, manager=None): + self.manager = manager + + def get_status(self) -> dict: + """ + Optional: Override this to provide custom status + information for this specific tool set. + """ + return {} def get_mcp_tools(self) -> List[Callable]: return self.get_mcp_methods("_is_mcp_tool") diff --git a/mcpserver/tools/manager.py b/mcpserver/tools/manager.py index b6c75dd..201f220 100644 --- a/mcpserver/tools/manager.py +++ b/mcpserver/tools/manager.py @@ -1,220 +1,209 @@ import importlib import inspect -import os -import re -from pathlib import Path -from typing import Dict +from typing import Any, Dict, List, Optional, Set from fastmcp.prompts import Prompt from fastmcp.resources import Resource - -# These are the function types we want to discover from fastmcp.tools import Tool +from mcpserver.events import get_event_manager + from .base import BaseTool +from .system.system import SystemTool class ToolManager: + """ + Top-level manager for tool registration. + Worker tools are registered with clean names; Hub handles namespacing. + """ def __init__(self): - self.tools = {} + self.tools: Dict[str, Dict[str, Any]] = {} + self.instances: Dict[str, BaseTool] = {} + self.registered_keys: Set[str] = set() + self._events_initialized = False + + def register_instance_with_mcp(self, mcp, instance: BaseTool): + """ + Maps decorated methods from a tool instance to FastMCP endpoints. + """ + mapping = {Tool: mcp.add_tool, Resource: mcp.add_resource, Prompt: mcp.add_prompt} + + for ToolClass, add_func in mapping.items(): + type_prefix = ToolClass.__name__.lower() + method_name = f"get_mcp_{type_prefix}s" + getter = getattr(instance, method_name, None) + + if not getter: + continue + + for func in getter(): + name = getattr(func, "_mcp_name", func.__name__) + + # Check for duplicate registration within this process + unique_key = f"{type_prefix}:{name}" + if unique_key in self.registered_keys: + continue - def load_function(self, tool_path): + endpoint = ToolClass.from_function(func, name=name) + try: + add_func(endpoint) + self.registered_keys.add(unique_key) + except Exception as e: + print(f"āš ļø Failed to register {unique_key}: {e}") + + def load_fleet_tools(self, mcp, include: Optional[List[str]] = None): """ - Assume this is the function name provided + The standard loader for an agentic worker. """ - module_path, function = tool_path.rsplit(".", 1) + # Initialize and register the local SystemTool + system = SystemTool() + system.name = "system" + system.setup(manager=self) + self.instances["system"] = system + self.register_instance_with_mcp(mcp, system) + + # Optional user-defined tool modules (maybe we don't need) + if not include: + return + + for path in include: + if "mcpserver.tools.system" in path: + continue + self.load_and_register_module(mcp, path) + + def load_and_register_module(self, mcp, module_path: str): + """ + Load and register a module. + """ + try: + module = importlib.import_module(module_path) + for _, obj in inspect.getmembers(module): + if inspect.isclass(obj) and issubclass(obj, BaseTool) and obj is not BaseTool: + name = module_path.split(".")[-1] + if name in self.instances: + continue + + inst = obj() + inst.name = name + inst.setup(manager=self) + self.instances[name] = inst + self.register_instance_with_mcp(mcp, inst) + except Exception as e: + print(f"āŒ Could not load extra tool module at {module_path}: {e}") + + def load_function(self, tool_path: str): + """ + Load a function. Worst docstring ever. I'm tired. + """ + module_path, function_name = tool_path.rsplit(".", 1) module = importlib.import_module(module_path) - return getattr(module, function) + return getattr(module, function_name) def register_tool(self, mcp, tool_path: str, name: str = None): """ - Register an mcp function directly. + Register a tool. """ + from fastmcp.tools import Tool + func = self.load_function(tool_path) - endpoint = Tool.from_function(func, name=name or func.__name__) + actual_name = name or func.__name__ + if f"tool:{actual_name}" in self.registered_keys: + return + endpoint = Tool.from_function(func, name=actual_name) mcp.add_tool(endpoint) + self.registered_keys.add(f"tool:{actual_name}") return endpoint def register_resource(self, mcp, tool_path: str, name: str = None): """ - Register an mcp resource directly. + Register a resource. + + Note from vsoch: I haven't tried any resources yet. """ + from fastmcp.resources import Resource + func = self.load_function(tool_path) - endpoint = Resource.from_function(func, name=name or func.__name__) + actual_name = name or func.__name__ + if f"resource:{actual_name}" in self.registered_keys: + return + endpoint = Resource.from_function(func, name=actual_name) mcp.add_resource(endpoint) + self.registered_keys.add(f"resource:{actual_name}") return endpoint def register_prompt(self, mcp, tool_path: str, name: str = None): """ - Register an mcp resource directly. + Register a prompt. + + Note from vsoch: In practice, I'm not sure I find server prompts useful. """ + from fastmcp.prompts import Prompt + func = self.load_function(tool_path) - endpoint = Prompt.from_function(func, name=name or func.__name__) + actual_name = name or func.__name__ + if f"prompt:{actual_name}" in self.registered_keys: + return + endpoint = Prompt.from_function(func, name=actual_name) mcp.add_prompt(endpoint) + self.registered_keys.add(f"prompt:{actual_name}") return endpoint - def register(self, module_name: str = "mcpserver.tools"): - """ - Discover and register tools from a module path. - - Note that we don't actually load mcp functions here. - They are loaded on demand based on the user start - request. Here we just keep track of discovered - contenders. - """ - # This needs to fail if we can't find it. - module = importlib.import_module(module_name) - if isinstance(module.__path__, list): - root_path = Path(module.__path__[0]).resolve() - # NamespacePath - else: - root_path = Path(module.__path__._path[0]).resolve() - self.tools.update(self.discover_tools(root_path, module_name)) - - def discover_tools(self, root_path: str, module_path: str) -> Dict[str, Path]: + def register_event(self, mcp, class_path: str, name: str = None): """ - Walks the directory tree to load tool metadata + Loads an external Event Class (e.g. FluxEvents), validates it, + and registers it with the SubscriptionManager. """ - discovered = {} - module_path = module_path.replace(os.sep, ".") - - # Recursive glob for all .py files - for file_path in root_path.rglob("*.py"): - if file_path.name != "tool.py": - continue - - # Calculate the relative path from 'tools/' - # e.g., kubernetes/deploy/job.py - rel_path = file_path.relative_to(root_path) - - # Assemble the module name - parts = list(rel_path.parts) - parts[-1] = os.path.splitext(parts[-1])[0] - import_path = module_path + "." + ".".join(parts) - - # Create the ID: kubernetes-deploy-job - # We strip the .py extension and replace slashes with dashes - tool_id = str(rel_path.with_suffix("")) - for repl in [[os.sep, "-"], ["-tool", ""], ["_", "-"]]: - tool_id = tool_id.replace(repl[0], repl[1]) - discovered[tool_id] = {"path": file_path, "module": import_path, "root": root_path} - return discovered - - def load_tools(self, mcp, include=None, exclude=None): - """ - Load a set of named tools, or default to all those discovered. - """ - # If no tools are selected... select all tools discovered - names = self.tools - include = "(%s)" % "|".join(include) if include else None - exclude = "(%s)" % "|".join(exclude) if exclude else None - - to_load = {} - for name in names: - # Prefix matching is more flexible than tag matching - matches = {k: v for k, v in self.tools.items() if name in k} - if not matches: - print(f"āš ļø No tools match pattern: '{name}'") - to_load.update(matches) - - # Load and Register a tool module - for name in to_load: - - # Inclusion and exclusion - if include and not re.search(include, name): - continue - if exclude and re.search(exclude, name): - continue - - # This is a tool instance. A tool instance can have 1+ functions - instance = self.load_tool(name) - if not instance: - continue - - # Add tools, resources, and prompts on the fly - for ToolClass in [Tool, Resource, Prompt]: - tooltype = ToolClass.__name__.lower() - getfunc = getattr(instance, f"get_mcp_{tooltype}s", None) + try: + # 1. Load the class + module_path, class_name = class_path.rsplit(".", 1) + module = importlib.import_module(module_path) + cls = getattr(module, class_name) - # Skip if the imlpementer did not add the class - if not getfunc: - continue + if not inspect.isclass(cls): + raise TypeError(f"{class_path} is not a class.") - # Get the decorated functions - for func in getfunc(): + # 2. Instantiate and Validate + instance = cls() + required = ["get_metadata", "subscribe", "unsubscribe"] + for req in required: + if not hasattr(instance, req): + raise AttributeError(f"Event class {class_name} missing required method: {req}") - # This is how we handle dynamic loading - endpoint = ToolClass.from_function(func, name=func._mcp_name) + # 3. Register with the Event Manager + provider_name = name or class_name.lower().replace("events", "") + manager = get_event_manager() + manager.register_provider(provider_name, instance) - # @mcp.tool - if ToolClass == Tool: - mcp.add_tool(endpoint) + # Ensure we can satisfy the server print "name" + if not hasattr(instance, "name"): + instance.name = provider_name - # @mcp.prompt - elif ToolClass == Prompt: - mcp.add_prompt(endpoint) + # 4. Inject the 3 core MCP tools if this is the first event registered + if not self._events_initialized: + self._register_core_event_tools(mcp) + self._events_initialized = True + return instance - # @mcp.resource - else: - mcp.add_resource(endpoint) - yield endpoint + except Exception as e: + print(f"āŒ Failed to register event provider at {class_path}: {e}") - def load_tool(self, tool_id: str) -> BaseTool: + def _register_core_event_tools(self, mcp): """ - Load a single tool (the actual module) based on finding BaseTool. + Adds the list, subscribe, and unsubscribe tools to FastMCP. """ - # Convert filesystem path to python module notation - relative_module = self.tools[tool_id]["module"] + from fastmcp.tools import Tool - try: - module = importlib.import_module(relative_module) - - # Find the class that inherits from BaseTool - for _, obj in inspect.getmembers(module): - if inspect.isclass(obj) and issubclass(obj, BaseTool) and obj is not BaseTool: + from mcpserver.events import tools as event_tools - # Instantiate - instance = obj() - # Inject the filesystem-derived name - instance.name = tool_id - instance.setup() - return instance - - except ImportError as e: - print(f"āŒ Error importing {tool_id}: {e}") - return None - - def get_available_prompts(self): - """ - Scans all discoverable tools for functions decorated with @mcp.prompt. - Returns a set of prompt names (personas). We need this to validate a plan. - A plan is not valid if it names a persona (prompt) that is not known. - """ - prompts = set() - - # 2. Load them (to execute decorators) - for tool_id, path in self.tools.items(): - mod = self.load_tool_module(tool_id, path) - if not mod: - continue + core_funcs = [ + event_tools.list_event_streams, + event_tools.subscribe, + event_tools.unsubscribe, + ] - # 3. Inspect the classes/functions in the module - for name, obj in inspect.getmembers(mod): - # We usually look for classes inheriting from BaseTool - # But we can also just scan the class attributes - if inspect.isclass(obj): - for attr_name in dir(obj): - try: - func = getattr(obj, attr_name) - except: - continue - - # CHECK FOR PROXY TAG - if callable(func) and getattr(func, "_is_mcp_prompt", False): - # Get the name from the decorator - p_name = getattr(func, "_mcp_name", None) - if p_name: - prompts.add(p_name) - - return prompts + for func in core_funcs: + endpoint = Tool.from_function(func, name=func.__name__) + mcp.add_tool(endpoint) + self.registered_keys.add(f"tool:{func.__name__}") diff --git a/mcpserver/tools/prompts.py b/mcpserver/tools/prompts.py deleted file mode 100644 index a5bca61..0000000 --- a/mcpserver/tools/prompts.py +++ /dev/null @@ -1,2 +0,0 @@ -def format_rules(rules): - return "\n".join([f"- {r}" for r in rules]) diff --git a/mcpserver/tools/simple/tool.py b/mcpserver/tools/simple/tool.py index c2566d1..f75b800 100644 --- a/mcpserver/tools/simple/tool.py +++ b/mcpserver/tools/simple/tool.py @@ -7,9 +7,6 @@ class EchoTool(BaseTool): The EchoTool is primarily for testing. """ - def setup(self): - pass - @mcp.tool(name="simple_echo") def echo(self, message: str): """Echo the message back (return it)""" diff --git a/mcpserver/tools/status/prompts.py b/mcpserver/tools/status/prompts.py deleted file mode 100644 index 2a7c908..0000000 --- a/mcpserver/tools/status/prompts.py +++ /dev/null @@ -1,28 +0,0 @@ -import mcpserver.tools.prompts as prompts - -PERSONA = "You are a workflow status expert." - -CONTEXT = "We just completed a step in an orchestration. We need to determine the final status. If you see a return code and it is 0, you MUST indicate success." - -REQUIRES = [ - "You MUST return a single json structure with a single field 'action'", - "The 'action' must be 'failure' or 'success'", -] - - -def get_status_text(content): - return f""" -### PERSONA -{PERSONA} - -### CONTEXT -{CONTEXT} - -### GOAL -Look at the step output and determine if the step has failed or succeeded. -{content} - -### INSTRUCTIONS -You must adhere to these rules strictly: -{prompts.format_rules(REQUIRES)} -""" diff --git a/mcpserver/tools/status/tool.py b/mcpserver/tools/status/tool.py deleted file mode 100644 index c556a37..0000000 --- a/mcpserver/tools/status/tool.py +++ /dev/null @@ -1,16 +0,0 @@ -import mcpserver.tools.status.prompts as prompts -from mcpserver.tools.base import BaseTool -from mcpserver.tools.decorator import mcp - - -class StatusTool(BaseTool): - - @mcp.prompt( - name="check_finished_prompt", description="Look at step outputs and determined if finished" - ) - def check_finished_prompt(self, content: str) -> dict: - """ - Generates agent instructions for determining if a step is completed, successful, failed. - """ - prompt_text = prompts.get_status_text(content) - return {"messages": [{"role": "user", "content": {"type": "text", "text": prompt_text}}]} diff --git a/mcpserver/tools/status/__init__.py b/mcpserver/tools/system/__init__.py similarity index 100% rename from mcpserver/tools/status/__init__.py rename to mcpserver/tools/system/__init__.py diff --git a/mcpserver/tools/system/system.py b/mcpserver/tools/system/system.py new file mode 100644 index 0000000..411e1b6 --- /dev/null +++ b/mcpserver/tools/system/system.py @@ -0,0 +1,55 @@ +import os +import time +from typing import Any, Dict, List + +from resource_secretary.agents.backends import get_backend +from resource_secretary.agents.secretary import SecretaryAgent +from resource_secretary.providers import discover_providers + +from mcpserver.tools.base import BaseTool +from mcpserver.tools.decorator import mcp + + +class SystemTool(BaseTool): + """ + Primary interface for cluster identity and negotiation. + """ + + def setup(self, manager=None): + self.manager = manager + self.catalog = discover_providers() + + # Capture model config from environment or manager defaults + # manager.args would contain the CLI values from populate_start_args + self.backend_config = { + "type": os.getenv("RESOURCE_SECRETARY_LLM"), + "model": os.getenv("RESOURCE_SECRETARY_MODEL"), + "base": os.getenv("RESOURCE_SECRETARY_API_BASE"), + } + + self.active_providers = [inst for category in self.catalog.values() for inst in category] + + def build_manifest(self) -> Dict[str, Any]: + manifest = {} + for category, instances in self.catalog.items(): + manifest[category] = {inst.name: inst.metadata for inst in instances} + return manifest + + def get_status(self) -> Dict[str, Any]: + return {"timestamp": time.time(), "manifest": self.build_manifest()} + + async def ask_secretary(self, request: str, verbose=False) -> Dict[str, Any]: + """ + Wakes up the local Secretary Agent using the configured backend. + """ + # Resolve the backend instance on-demand + backend = get_backend( + backend_type=self.backend_config["type"], + model_name=self.backend_config["model"], + api_base=self.backend_config["base"], + ) + + agent = SecretaryAgent(self.active_providers, backend=backend, verbose=verbose) + proposal = await agent.negotiate(request) + + return {"proposal": proposal, "status": "SUCCESS"} diff --git a/mcpserver/utils/text.py b/mcpserver/utils/text.py index 83b43c7..89d824b 100644 --- a/mcpserver/utils/text.py +++ b/mcpserver/utils/text.py @@ -1,6 +1,33 @@ import re +def sanitize(name: str) -> str: + # Replace hyphens/dots with underscores + clean = name.replace("-", "_").replace(".", "_") + # Python identifiers cannot start with a digit + if clean[0].isdigit(): + clean = f"n_{clean}" + return clean + + +def format_rules(rules): + return "\n".join([f"- {r}" for r in rules]) + + +def extract_code_block(text): + """ + Match block of code, assuming llm returns as markdown or code block. + + This is (I think) a better variant. + """ + match = re.search(r"```(?:\w+)?\s*\n(.*?)\n\s*```", text, re.DOTALL) + # Extract content from ```json ... ``` blocks if present + if match: + return match.group(1).strip() + # Fall back to returning stripped text + return text.strip() + + def get_code_block(content, code_type=None): """ Parse a code block from the response diff --git a/mcpserver/version.py b/mcpserver/version.py index a61f309..2ec0cf2 100644 --- a/mcpserver/version.py +++ b/mcpserver/version.py @@ -1,4 +1,4 @@ -__version__ = "0.0.15" +__version__ = "0.0.16" AUTHOR = "Vanessa Sochat" AUTHOR_EMAIL = "vsoch@users.noreply.github.com" NAME = "mcp-serve" @@ -24,6 +24,7 @@ ("textual", {"min_version": None}), ) +HUB_REQUIRES = (("resource-secretary[gemini,openai]", {"min_version": None}),) TESTS_REQUIRES = (("pytest", {"min_version": "4.6.2"}),) -INSTALL_REQUIRES_ALL = INSTALL_REQUIRES + TESTS_REQUIRES +INSTALL_REQUIRES_ALL = INSTALL_REQUIRES + TESTS_REQUIRES + HUB_REQUIRES diff --git a/setup.py b/setup.py index ab3850a..c465bd9 100644 --- a/setup.py +++ b/setup.py @@ -66,6 +66,7 @@ def get_reqs(lookup=None, key="INSTALL_REQUIRES"): INSTALL_REQUIRES = get_reqs(lookup) TESTS_REQUIRES = get_reqs(lookup, "TESTS_REQUIRES") INSTALL_REQUIRES_ALL = get_reqs(lookup, "INSTALL_REQUIRES_ALL") + HUB_REQUIRES = get_reqs(lookup, "HUB_REQUIRES") setup( name=NAME, @@ -87,6 +88,7 @@ def get_reqs(lookup=None, key="INSTALL_REQUIRES"): tests_require=TESTS_REQUIRES, extras_require={ "all": [INSTALL_REQUIRES_ALL], + "hub": [HUB_REQUIRES], }, classifiers=[ "Intended Audience :: Science/Research",