Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ The following variables can be set in the environment.
| `MCPSERVER_TOKEN` | Token to use for testing | unset |



## Usage

### Start the Server
Expand Down Expand Up @@ -272,6 +273,27 @@ mcpserver start --config examples/jobspec/mcpserver.yaml --join http://0.0.0.0:8
mcpserver start --config examples/jobspec/mcpserver.yaml --join http://0.0.0.0:8000 --port 7777 --archetype hpc
```

### Providers Interface

When you bring up a hub and workers, by default we will use the [resource-secretary](https://github.com/converged-computing/resource-secretary) library to create real and/or mock providers to add to it. If you do not have this installed it will still come up, but without these tools for the agent. In addition, you can add your own catalogs of tools for your hierarchy. Add them to the mcpserver.yaml. Here is a simple example:

```yaml
tools:
- path: flux_mcp.validate.flux_validate_jobspec
- path: hpc_mcp.filesystem.filesystem_write_file
catalogs:
- path: snakemake_agent.catalog.SnakemakeCatalog
name: snakemake
```

A catalog is a provider that is not probed for automatically, and they can be provisioned by the library here (as the example above) or externally. They typically are not probed for because the user should request using it. As an example, snakemake is going to clone snakemake-wrappers, which is a system change even if just writing to a temporary directory. You would never want to do that automatically.

Adding this catalog will expose interfaces for your worker to provide Snakemake functions, more specifically wrappers, to use. If you want to implement a catalog, you can make a basic class with `probe` that needs to return True/False to determine if it should be used, and then appropriate functions you want to expose. We support the following decorators for the Worker Secretary Agent:

- `workflow_tool`: A tool intended for use when the workflow agent is running a workflow
- `dispatch_tool`: revealed when a secretary agent is deciding to dispatch work.
- `secretary_tool`: revealed when a secretary agent is negotiating (e.g., "Can I satisfy this request?")

### Mocking a Hub

If you are doing experiments, you can bring up a hub the same way:
Expand Down Expand Up @@ -430,6 +452,7 @@ Here are a few design choices (subject to change, of course). I am starting with
- [ ] need way to "pass forward" an error from a worker that, for example, API key not set.
- [ ] I want to have the equivalent of a satisfy endpoint, checking for the negotiate but not dispatch.
- [ ] I also want an equivalent "just submit to this cluster" endpoint.
- [ ] likely we want an ability to one off disabling probing or specific providers

Idea:

Expand Down
37 changes: 37 additions & 0 deletions examples/catalogs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Catalogs

A catalog is a provider that needs to be explicitly added, and then will expose multiple different functions for an agent.
Let's first prepare some snakemake data:

```bash
wget https://github.com/snakemake/snakemake-tutorial-data/archive/v5.4.5.tar.gz
tar --wildcards -xf v5.4.5.tar.gz --strip 1 "*/data"
```

Setup your conda:

```bash
conda config --add channels defaults
conda config --add channels bioconda
conda config --add channels conda-forge
conda config --set channel_priority strict
pip install snakemake-wrapper-utils
```

Export envars so the snakemake catalog tools know EXACTLY where the data is (read only) and where to write (should be an empty directory for read/write)

```bash
mkdir -p ./workdir
export RESOURCE_SECRETARY_SNAKEMAKE_WORKDIR=$(pwd)/workdir
export RESOURCE_SECRETARY_SNAKEMAKE_INPUT=$(pwd)/data
```

```bash
mcpserver start --config ./snakemake.yaml --dual --port 8089
```

And now run fractale (and export needed tokens):

```bash
fractale run --database json ./plan.yaml
```
13 changes: 13 additions & 0 deletions examples/catalogs/plan.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
name: Snakemake Tutorial Workflow
agents:
- path: fractale_agents.hpc.workflow.SnakemakeWorkflowAgent
steps:
- name: tutorial
type: agent
tool: snakemake-workflow
inputs:
goal: |
Map E. coli sequencing reads from samples A, B, and C to the reference genome,
sort and index the resulting alignments, then call genomic variants across all
three samples jointly. The reference genome and reads are in the INPUT_DIR.
Write all outputs to WORK_DIR.
3 changes: 3 additions & 0 deletions examples/catalogs/snakemake.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
catalogs:
- path: resource_secretary.providers.workflow.SnakemakeProvider
name: snakemake
8 changes: 1 addition & 7 deletions mcpserver/cli/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def populate_start_args(start):
"--event", action="append", help="Direct event stream to import.", default=[]
)
start.add_argument("--tool", action="append", help="Direct tool to import.", default=[])
start.add_argument("--catalog", action="append", help="Direct catalog to import.", default=[])
start.add_argument("--resource", action="append", help="Direct resource to import.", default=[])
start.add_argument("--prompt", action="append", help="Direct prompt to import.", default=[])
start.add_argument("--include", help="Include tags", action="append", default=None)
Expand Down Expand Up @@ -111,13 +112,6 @@ def populate_start_args(start):
action="store_true",
default=False,
)
worker_group.add_argument(
"--label",
action="append",
dest="labels",
help="Custom labels in key=value format (e.g., --label gpu=h100). Can be used multiple times.",
)

# const=True is what we get if the flag is present but no value is given
# default=False is what we get if the flag is totally absent
# THe user can also ask for an archetype (hpc, cloud, standalone)
Expand Down
9 changes: 8 additions & 1 deletion mcpserver/cli/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ def get_manager(mcp, cfg: MCPConfig):
# A repeated function will return None
if not endpoint:
continue
print(f" {emoji} Registered: {endpoint.name}")

# Catalog can include a listing of tools
if isinstance(endpoint, list):
for e in endpoint:
print(f" {emoji} Registered: {e.name}")
else:
print(f" {emoji} Registered: {endpoint.name}")

# Handle SSL
if cfg.server.ssl_keyfile is not None and cfg.server.ssl_certfile is not None:
Expand All @@ -45,6 +51,7 @@ def register_explicit_capabilities(mcp, cfg: MCPConfig):
# Map configuration lists to the manager's registration methods
registries = [
(cfg.tools, manager.register_tool, "✅"),
(cfg.catalogs, manager.register_catalog, "📖"),
(cfg.prompts, manager.register_prompt, "💬"),
(cfg.resources, manager.register_resource, "⛰️"),
(cfg.events, manager.register_event, "📡"),
Expand Down
5 changes: 4 additions & 1 deletion mcpserver/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class MCPConfig:
discovery: List[str] = field(default_factory=list)
tools: List[Capability] = field(default_factory=list)
events: List[Capability] = field(default_factory=list)
catalogs: List[Capability] = field(default_factory=list)
prompts: List[Capability] = field(default_factory=list)
resources: List[Capability] = field(default_factory=list)

Expand Down Expand Up @@ -92,6 +93,7 @@ def make_caps(key):
discovery=data.get("discovery", []),
tools=make_caps("tools"),
events=make_caps("events"),
catalogs=make_caps("catalogs"),
prompts=make_caps("prompts"),
resources=make_caps("resources"),
)
Expand All @@ -114,7 +116,8 @@ def from_args(cls, args):
exclude=args.exclude,
discovery=args.tool_module or [],
tools=[Capability(path=t) for t in (args.tool or [])],
events=[Capability(path=e) for t in (args.event or [])],
events=[Capability(path=e) for e in (args.event or [])],
catalogs=[Capability(path=c) for c in (args.catalog or [])],
prompts=[Capability(path=p) for p in (args.prompt or [])],
resources=[Capability(path=r) for r in (args.resource or [])],
)
6 changes: 1 addition & 5 deletions mcpserver/core/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,7 @@ async def fetch_all_statuses(self) -> dict:

async def status_handler(wid, sess):
info = self.workers[wid]
base_metadata = {
"labels": info.get("labels", {}),
"url": info["url"],
}
base_metadata = {"url": info["url"]}

mcp_result = await sess.call_tool("get_status", {})
raw_text = mcp_result.content[0].text
Expand Down Expand Up @@ -329,7 +326,6 @@ async def register(request: Request):
self.workers[wid] = {
"url": wurl,
"client": Client(wurl),
"labels": data.get("labels", {}),
}

# Discover tools in the background
Expand Down
19 changes: 0 additions & 19 deletions mcpserver/core/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ def __init__(
secret: str,
worker_id: Optional[str] = None,
public_url: Optional[str] = None,
labels: Optional[list] = None,
mock: Optional[bool] = False,
verbose: Optional[bool] = False,
):
Expand All @@ -39,9 +38,6 @@ def __init__(
# Static Manifest for the worker
self.manifest = self.build_manifest()

# Note from vsoch: not sure if this will be useful / what we should use for.
self.labels = self.parse_labels(labels)

# Register MCP Tools automatically
self.register_agent_tools()

Expand All @@ -65,19 +61,6 @@ def build_manifest(self) -> Dict[str, Any]:
manifest[category] = {inst.name: inst.metadata for inst in instances}
return manifest

def parse_labels(self, label_list: Optional[list]) -> dict:
"""
Converts ['key=val', 'key2=val2'] to a dictionary.
"""
labels = {}
if not label_list:
return labels
for item in label_list:
if "=" in item:
k, v = item.split("=", 1)
labels[k.strip()] = v.strip()
return labels

async def run_registration(self):
"""
Registers the worker with the Hub.
Expand All @@ -88,7 +71,6 @@ async def run_registration(self):
payload = {
"id": self.worker_id,
"url": self.public_url,
"labels": self.labels,
"manifest": self.manifest,
}
headers = {"X-MCP-Token": self.secret}
Expand Down Expand Up @@ -119,6 +101,5 @@ def from_args(cls, mcp, args, cfg) -> Optional["WorkerManager"]:
secret=args.join_secret,
worker_id=args.worker_id,
public_url=public_url,
labels=args.labels,
verbose=args.verbose,
)
31 changes: 31 additions & 0 deletions mcpserver/tools/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,37 @@ def register_tool(self, mcp, tool_path: str, name: str = None):
self.registered_keys.add(f"tool:{actual_name}")
return endpoint

def register_catalog(self, mcp, tool_path: str, name: str = None):
"""
Register a catalog (a provider) with tools
"""

module_name, class_name = tool_path.rsplit(".", 1)
module = importlib.import_module(module_name)

tools = []
for attr_name, obj in inspect.getmembers(module, inspect.isclass):
if attr_name != class_name:
continue

# Instantiate the class so methods are bound and self is resolved
instance = obj()
if hasattr(instance, "probe"):
instance.probe()

for method_name, method in inspect.getmembers(instance, predicate=inspect.ismethod):
if (
hasattr(method, "is_tool")
and method.is_tool
and f"tool:{method_name}" not in self.registered_keys
):
endpoint = Tool.from_function(method, name=method_name)
mcp.add_tool(endpoint)
self.registered_keys.add(f"tool:{method_name}")
tools.append(endpoint)

return tools

def register_resource(self, mcp, tool_path: str, name: str = None):
"""
Register a resource.
Expand Down
Loading