diff --git a/README.md b/README.md index 61c726e..e8ef1fe 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,7 @@ The following variables can be set in the environment. | `MCPSERVER_TOKEN` | Token to use for testing | unset | + ## Usage ### Start the Server @@ -272,6 +273,27 @@ mcpserver start --config examples/jobspec/mcpserver.yaml --join http://0.0.0.0:8 mcpserver start --config examples/jobspec/mcpserver.yaml --join http://0.0.0.0:8000 --port 7777 --archetype hpc ``` +### Providers Interface + +When you bring up a hub and workers, by default we will use the [resource-secretary](https://github.com/converged-computing/resource-secretary) library to create real and/or mock providers to add to it. If you do not have this installed it will still come up, but without these tools for the agent. In addition, you can add your own catalogs of tools for your hierarchy. Add them to the mcpserver.yaml. Here is a simple example: + +```yaml +tools: + - path: flux_mcp.validate.flux_validate_jobspec + - path: hpc_mcp.filesystem.filesystem_write_file +catalogs: + - path: snakemake_agent.catalog.SnakemakeCatalog + name: snakemake +``` + +A catalog is a provider that is not probed for automatically, and they can be provisioned by the library here (as the example above) or externally. They typically are not probed for because the user should request using it. As an example, snakemake is going to clone snakemake-wrappers, which is a system change even if just writing to a temporary directory. You would never want to do that automatically. + +Adding this catalog will expose interfaces for your worker to provide Snakemake functions, more specifically wrappers, to use. If you want to implement a catalog, you can make a basic class with `probe` that needs to return True/False to determine if it should be used, and then appropriate functions you want to expose. We support the following decorators for the Worker Secretary Agent: + +- `workflow_tool`: A tool intended for use when the workflow agent is running a workflow +- `dispatch_tool`: revealed when a secretary agent is deciding to dispatch work. +- `secretary_tool`: revealed when a secretary agent is negotiating (e.g., "Can I satisfy this request?") + ### Mocking a Hub If you are doing experiments, you can bring up a hub the same way: @@ -430,6 +452,7 @@ Here are a few design choices (subject to change, of course). I am starting with - [ ] need way to "pass forward" an error from a worker that, for example, API key not set. - [ ] I want to have the equivalent of a satisfy endpoint, checking for the negotiate but not dispatch. - [ ] I also want an equivalent "just submit to this cluster" endpoint. +- [ ] likely we want an ability to one off disabling probing or specific providers Idea: diff --git a/examples/catalogs/README.md b/examples/catalogs/README.md new file mode 100644 index 0000000..96bc013 --- /dev/null +++ b/examples/catalogs/README.md @@ -0,0 +1,37 @@ +# Catalogs + +A catalog is a provider that needs to be explicitly added, and then will expose multiple different functions for an agent. +Let's first prepare some snakemake data: + +```bash +wget https://github.com/snakemake/snakemake-tutorial-data/archive/v5.4.5.tar.gz +tar --wildcards -xf v5.4.5.tar.gz --strip 1 "*/data" +``` + +Setup your conda: + +```bash +conda config --add channels defaults +conda config --add channels bioconda +conda config --add channels conda-forge +conda config --set channel_priority strict +pip install snakemake-wrapper-utils +``` + +Export envars so the snakemake catalog tools know EXACTLY where the data is (read only) and where to write (should be an empty directory for read/write) + +```bash +mkdir -p ./workdir +export RESOURCE_SECRETARY_SNAKEMAKE_WORKDIR=$(pwd)/workdir +export RESOURCE_SECRETARY_SNAKEMAKE_INPUT=$(pwd)/data +``` + +```bash +mcpserver start --config ./snakemake.yaml --dual --port 8089 +``` + +And now run fractale (and export needed tokens): + +```bash +fractale run --database json ./plan.yaml +``` diff --git a/examples/catalogs/plan.yaml b/examples/catalogs/plan.yaml new file mode 100644 index 0000000..77bd67c --- /dev/null +++ b/examples/catalogs/plan.yaml @@ -0,0 +1,13 @@ +name: Snakemake Tutorial Workflow +agents: + - path: fractale_agents.hpc.workflow.SnakemakeWorkflowAgent +steps: + - name: tutorial + type: agent + tool: snakemake-workflow + inputs: + goal: | + Map E. coli sequencing reads from samples A, B, and C to the reference genome, + sort and index the resulting alignments, then call genomic variants across all + three samples jointly. The reference genome and reads are in the INPUT_DIR. + Write all outputs to WORK_DIR. \ No newline at end of file diff --git a/examples/catalogs/snakemake.yaml b/examples/catalogs/snakemake.yaml new file mode 100644 index 0000000..33afa9f --- /dev/null +++ b/examples/catalogs/snakemake.yaml @@ -0,0 +1,3 @@ +catalogs: + - path: resource_secretary.providers.workflow.SnakemakeProvider + name: snakemake diff --git a/mcpserver/cli/args.py b/mcpserver/cli/args.py index 1ea53de..bd20993 100644 --- a/mcpserver/cli/args.py +++ b/mcpserver/cli/args.py @@ -38,6 +38,7 @@ def populate_start_args(start): "--event", action="append", help="Direct event stream to import.", default=[] ) start.add_argument("--tool", action="append", help="Direct tool to import.", default=[]) + start.add_argument("--catalog", action="append", help="Direct catalog to import.", default=[]) start.add_argument("--resource", action="append", help="Direct resource to import.", default=[]) start.add_argument("--prompt", action="append", help="Direct prompt to import.", default=[]) start.add_argument("--include", help="Include tags", action="append", default=None) @@ -111,13 +112,6 @@ def populate_start_args(start): action="store_true", default=False, ) - worker_group.add_argument( - "--label", - action="append", - dest="labels", - help="Custom labels in key=value format (e.g., --label gpu=h100). Can be used multiple times.", - ) - # const=True is what we get if the flag is present but no value is given # default=False is what we get if the flag is totally absent # THe user can also ask for an archetype (hpc, cloud, standalone) diff --git a/mcpserver/cli/manager.py b/mcpserver/cli/manager.py index 3324358..f233cad 100644 --- a/mcpserver/cli/manager.py +++ b/mcpserver/cli/manager.py @@ -25,7 +25,13 @@ def get_manager(mcp, cfg: MCPConfig): # A repeated function will return None if not endpoint: continue - print(f" {emoji} Registered: {endpoint.name}") + + # Catalog can include a listing of tools + if isinstance(endpoint, list): + for e in endpoint: + print(f" {emoji} Registered: {e.name}") + else: + print(f" {emoji} Registered: {endpoint.name}") # Handle SSL if cfg.server.ssl_keyfile is not None and cfg.server.ssl_certfile is not None: @@ -45,6 +51,7 @@ def register_explicit_capabilities(mcp, cfg: MCPConfig): # Map configuration lists to the manager's registration methods registries = [ (cfg.tools, manager.register_tool, "✅"), + (cfg.catalogs, manager.register_catalog, "📖"), (cfg.prompts, manager.register_prompt, "💬"), (cfg.resources, manager.register_resource, "⛰️"), (cfg.events, manager.register_event, "📡"), diff --git a/mcpserver/core/config.py b/mcpserver/core/config.py index 43ddd1d..7fab38a 100644 --- a/mcpserver/core/config.py +++ b/mcpserver/core/config.py @@ -51,6 +51,7 @@ class MCPConfig: discovery: List[str] = field(default_factory=list) tools: List[Capability] = field(default_factory=list) events: List[Capability] = field(default_factory=list) + catalogs: List[Capability] = field(default_factory=list) prompts: List[Capability] = field(default_factory=list) resources: List[Capability] = field(default_factory=list) @@ -92,6 +93,7 @@ def make_caps(key): discovery=data.get("discovery", []), tools=make_caps("tools"), events=make_caps("events"), + catalogs=make_caps("catalogs"), prompts=make_caps("prompts"), resources=make_caps("resources"), ) @@ -114,7 +116,8 @@ def from_args(cls, args): exclude=args.exclude, discovery=args.tool_module or [], tools=[Capability(path=t) for t in (args.tool or [])], - events=[Capability(path=e) for t in (args.event or [])], + events=[Capability(path=e) for e in (args.event or [])], + catalogs=[Capability(path=c) for c in (args.catalog or [])], prompts=[Capability(path=p) for p in (args.prompt or [])], resources=[Capability(path=r) for r in (args.resource or [])], ) diff --git a/mcpserver/core/hub.py b/mcpserver/core/hub.py index 2e22a8d..7295941 100644 --- a/mcpserver/core/hub.py +++ b/mcpserver/core/hub.py @@ -291,10 +291,7 @@ async def fetch_all_statuses(self) -> dict: async def status_handler(wid, sess): info = self.workers[wid] - base_metadata = { - "labels": info.get("labels", {}), - "url": info["url"], - } + base_metadata = {"url": info["url"]} mcp_result = await sess.call_tool("get_status", {}) raw_text = mcp_result.content[0].text @@ -329,7 +326,6 @@ async def register(request: Request): self.workers[wid] = { "url": wurl, "client": Client(wurl), - "labels": data.get("labels", {}), } # Discover tools in the background diff --git a/mcpserver/core/worker.py b/mcpserver/core/worker.py index 863152c..566c3d9 100644 --- a/mcpserver/core/worker.py +++ b/mcpserver/core/worker.py @@ -23,7 +23,6 @@ def __init__( secret: str, worker_id: Optional[str] = None, public_url: Optional[str] = None, - labels: Optional[list] = None, mock: Optional[bool] = False, verbose: Optional[bool] = False, ): @@ -39,9 +38,6 @@ def __init__( # Static Manifest for the worker self.manifest = self.build_manifest() - # Note from vsoch: not sure if this will be useful / what we should use for. - self.labels = self.parse_labels(labels) - # Register MCP Tools automatically self.register_agent_tools() @@ -65,19 +61,6 @@ def build_manifest(self) -> Dict[str, Any]: manifest[category] = {inst.name: inst.metadata for inst in instances} return manifest - def parse_labels(self, label_list: Optional[list]) -> dict: - """ - Converts ['key=val', 'key2=val2'] to a dictionary. - """ - labels = {} - if not label_list: - return labels - for item in label_list: - if "=" in item: - k, v = item.split("=", 1) - labels[k.strip()] = v.strip() - return labels - async def run_registration(self): """ Registers the worker with the Hub. @@ -88,7 +71,6 @@ async def run_registration(self): payload = { "id": self.worker_id, "url": self.public_url, - "labels": self.labels, "manifest": self.manifest, } headers = {"X-MCP-Token": self.secret} @@ -119,6 +101,5 @@ def from_args(cls, mcp, args, cfg) -> Optional["WorkerManager"]: secret=args.join_secret, worker_id=args.worker_id, public_url=public_url, - labels=args.labels, verbose=args.verbose, ) diff --git a/mcpserver/tools/manager.py b/mcpserver/tools/manager.py index 445e987..bdbe5f4 100644 --- a/mcpserver/tools/manager.py +++ b/mcpserver/tools/manager.py @@ -118,6 +118,37 @@ def register_tool(self, mcp, tool_path: str, name: str = None): self.registered_keys.add(f"tool:{actual_name}") return endpoint + def register_catalog(self, mcp, tool_path: str, name: str = None): + """ + Register a catalog (a provider) with tools + """ + + module_name, class_name = tool_path.rsplit(".", 1) + module = importlib.import_module(module_name) + + tools = [] + for attr_name, obj in inspect.getmembers(module, inspect.isclass): + if attr_name != class_name: + continue + + # Instantiate the class so methods are bound and self is resolved + instance = obj() + if hasattr(instance, "probe"): + instance.probe() + + for method_name, method in inspect.getmembers(instance, predicate=inspect.ismethod): + if ( + hasattr(method, "is_tool") + and method.is_tool + and f"tool:{method_name}" not in self.registered_keys + ): + endpoint = Tool.from_function(method, name=method_name) + mcp.add_tool(endpoint) + self.registered_keys.add(f"tool:{method_name}") + tools.append(endpoint) + + return tools + def register_resource(self, mcp, tool_path: str, name: str = None): """ Register a resource.