Skip to content

Commit b8f4748

Browse files
authored
fix: gate SSH dispatch to Docker-capable workers (#74)
1 parent 769e416 commit b8f4748

23 files changed

Lines changed: 445 additions & 140 deletions

docs/ARCHITECTURE.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,12 @@ scripts/dev/ compile_protos, sync_requirements, check_env_examples
127127
`WorkerHardware`. The dispatcher's `_cached_worker_candidates` filters
128128
to workers whose cache covers the task's references; entries older
129129
than `WORKER_CACHE_TTL_SEC` are ignored.
130+
- **Worker capabilities.** Beyond hardware fit, each worker advertises a
131+
`WorkerCapabilities` describing the task kinds it can service. The dispatcher
132+
routes a task that requires a capability only to workers advertising it, and
133+
excludes a worker that advertises none from those tasks rather than failing
134+
them on it. SSH is one such capability — a worker advertises it when it can
135+
launch session containers.
130136
- **Cursor pagination.** List endpoints accept `limit` and `before` /
131137
`after` cursors. The cursor is an opaque base64 of `(timestamp, id)`;
132138
do not parse client-side.

pyproject.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ all = [
2323
# Runtime/container dependencies.
2424
runtime-server = [
2525
"aiohttp>=3.14.0",
26-
"cryptography>=46.0.7",
26+
"cryptography>=48.0.1",
2727
"docker>=7.1.0",
2828
"fastapi>=0.135.0",
2929
"flowmesh-hook",
@@ -32,7 +32,7 @@ runtime-server = [
3232
"lumid-hooks>=0.2.0",
3333
"protobuf>=5.29.6",
3434
"pydantic>=2.12.3",
35-
"python-multipart>=0.0.26",
35+
"python-multipart>=0.0.31",
3636
"pyyaml>=6.0.2",
3737
"redis>=7.0.1",
3838
"requests>=2.33.0",
@@ -83,7 +83,7 @@ runtime-agent = [
8383
"arxiv>=2.2.0",
8484
"beautifulsoup4>=4.14.3",
8585
"chunkr-ai>=0.3.7",
86-
"crawl4ai>=0.8.0",
86+
"crawl4ai>=0.8.9",
8787
"ddgs>=9.10.0",
8888
"google-genai>=1.47.0",
8989
"hydra-core>=1.3.2",

sdk/src/flowmesh/models/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
SSHLimits,
4343
StorageInfo,
4444
Worker,
45+
WorkerCapabilities,
4546
WorkerHardware,
4647
WorkerInfo,
4748
)
@@ -91,6 +92,7 @@
9192
"TaskType",
9293
"TaskUsage",
9394
"Worker",
95+
"WorkerCapabilities",
9496
"WorkerHardware",
9597
"WorkerInfo",
9698
"WorkerRegisterResponse",

sdk/src/flowmesh/models/workers.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ class SSHLimits(BaseModel):
7979
max_pids: int | None = None
8080

8181

82+
class WorkerCapabilities(BaseModel):
83+
ssh: bool = False
84+
85+
8286
class Worker(BaseModel):
8387
id: str
8488
alias: str | None = None
@@ -91,6 +95,7 @@ class Worker(BaseModel):
9195
pid: int | None = None
9296
env: dict[str, Any] = Field(default_factory=dict)
9397
hardware: WorkerHardware | None = None
98+
capabilities: WorkerCapabilities = Field(default_factory=WorkerCapabilities)
9499
ssh_limits: SSHLimits | None = None
95100
tags: list[str] = Field(default_factory=list)
96101
last_seen: str | None = None

src/server/dispatcher/base.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,8 @@ def dispatch_once(self, task_id: str) -> bool:
178178
task_id,
179179
record,
180180
reason="no_eligible_worker",
181-
message="No worker satisfies the task hardware requirements",
181+
message="No worker satisfies the task hardware and capability "
182+
"requirements",
182183
)
183184
if not (eligible - failed_ids):
184185
return self._grace_then_fail_exhausted(task_id, record, failed_ids)

src/server/registries/worker.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
StopMessage,
1010
TaskMessage,
1111
)
12-
from shared.schemas.worker import SSHLimits
12+
from shared.schemas.worker import SSHLimits, WorkerCapabilities
1313
from shared.tasks import TaskEnvelope
1414
from shared.tasks.components.resources import GPURequirements
1515
from shared.tasks.specs import SSHSpecStrict, SSHSpecTemplate
@@ -55,6 +55,10 @@ class Worker(BaseModel):
5555
hardware: WorkerHardware | None = Field(
5656
default=None, description="Hardware metadata."
5757
)
58+
capabilities: WorkerCapabilities = Field(
59+
default_factory=WorkerCapabilities,
60+
description="Task capabilities the worker advertises.",
61+
)
5862
ssh_limits: SSHLimits | None = Field(
5963
default=None, description="Configured ceiling on SSH session resources."
6064
)
@@ -372,18 +376,18 @@ def idle_satisfying_pool(self, task: TaskEnvelope) -> list[Worker]:
372376
continue
373377
if self.is_worker_stale(worker.id):
374378
continue
375-
if hw_satisfies(worker, task):
379+
if hw_satisfies(worker, task) and capability_satisfies(worker, task):
376380
available.append(worker)
377381
return self.sort_workers(available)
378382

379383
def satisfying_workers(self, task: TaskEnvelope) -> list[Worker]:
380-
"""Non-stale workers whose hardware satisfies the task, any status."""
384+
"""Non-stale workers whose hardware and capabilities satisfy the task."""
381385
available: list[Worker] = []
382386
for worker_id in self.get_worker_ids():
383387
worker = self.get_worker(worker_id)
384388
if not worker or self.is_worker_stale(worker.id):
385389
continue
386-
if hw_satisfies(worker, task):
390+
if hw_satisfies(worker, task) and capability_satisfies(worker, task):
387391
available.append(worker)
388392
return self.sort_workers(available)
389393

@@ -545,6 +549,12 @@ def hw_satisfies(worker: Worker, task: TaskEnvelope) -> bool:
545549
return True
546550

547551

552+
def capability_satisfies(worker: Worker, task: TaskEnvelope) -> bool:
553+
if isinstance(task.spec, (SSHSpecStrict, SSHSpecTemplate)):
554+
return worker.capabilities.ssh
555+
return True
556+
557+
548558
def _gpu_meets_requirements(hw: WorkerHardware, gpu_req: GPURequirements) -> bool:
549559
required_count = gpu_req.count
550560
if required_count is not None:
@@ -642,6 +652,12 @@ def _ensure_str_list(items: Any) -> list[str]:
642652
if hardware_json is None
643653
else WorkerHardware.model_validate_json(hardware_json)
644654
)
655+
capabilities_json = value.get("capabilities_json")
656+
capabilities = (
657+
WorkerCapabilities()
658+
if capabilities_json is None
659+
else WorkerCapabilities.model_validate_json(capabilities_json)
660+
)
645661
ssh_limits_json = value.get("ssh_limits_json")
646662
ssh_limits = (
647663
None
@@ -677,6 +693,7 @@ def _ensure_str_list(items: Any) -> list[str]:
677693
pid=pid,
678694
env=env,
679695
hardware=hardware,
696+
capabilities=capabilities,
680697
ssh_limits=ssh_limits,
681698
tags=tags,
682699
last_seen=value.get("last_seen"),

src/server/requirements.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
# AUTO-GENERATED by scripts/dev/sync_requirements.py — do not edit by hand.
22
# Regenerate with: uv run scripts/dev/sync_requirements.py --write
33
aiohttp==3.14.1
4-
cryptography==47.0.0
4+
cryptography==49.0.0
55
docker==7.1.0
66
fastapi==0.136.1
77
grpcio==1.76.0
88
httpx==0.28.1
99
lumid-hooks==0.2.0
1010
protobuf==5.29.6
1111
pydantic==2.12.3
12-
python-multipart==0.0.27
12+
python-multipart==0.0.32
1313
pyyaml==6.0.2
1414
redis==7.0.1
1515
requests==2.33.1

src/shared/schemas/worker.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,12 @@ class SSHLimits(BaseModel):
3030
)
3131

3232

33-
__all__ = ["SSHLimits", "WorkerStatus"]
33+
class WorkerCapabilities(BaseModel):
34+
"""Task capabilities a worker advertises to the dispatcher."""
35+
36+
ssh: bool = Field(
37+
default=False, description="Whether the worker can run SSH session tasks."
38+
)
39+
40+
41+
__all__ = ["SSHLimits", "WorkerCapabilities", "WorkerStatus"]

src/worker/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,12 @@ The SSH session container is labelled with `flowmesh.ssh.task_id` and
119119
`flowmesh.ssh.worker_id` so the server can clean it up if the worker
120120
container is stopped externally.
121121

122+
At startup the worker initializes the SSH executor only if the Docker daemon is
123+
reachable, and advertises its `ssh` capability accordingly; the dispatcher then
124+
selects it for SSH tasks only when it can actually run them. (The supervisor
125+
mounts the Docker socket only when SSH is enabled for the node; see
126+
`ENABLE_SSH_BY_DEFAULT` below.)
127+
122128
Relevant env vars for SSH tasks:
123129

124130
| Variable | Default | Description |

src/worker/executors/__init__.py

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,60 @@
11
import importlib
22

3+
from .base_executor import Executor
4+
35
_IMPORT_ERRORS: dict[str, str] = {}
46

57

6-
def _safe_import(name: str, module: str) -> type | None:
8+
def _import_executor(name: str, module: str) -> type[Executor] | None:
79
try:
810
pkg = importlib.import_module(module, package=__package__)
9-
return getattr(pkg, name)
11+
if issubclass(cls := getattr(pkg, name), Executor):
12+
return cls
13+
error = f"{name} is not a subclass of Executor"
1014
except Exception as exc:
11-
_IMPORT_ERRORS[name] = str(exc)
12-
return None
15+
error = str(exc)
16+
_IMPORT_ERRORS[name] = error
17+
return None
1318

1419

15-
VLLMExecutor = _safe_import("VLLMExecutor", ".vllm_executor")
16-
VLLMLoRAExecutor = _safe_import("VLLMLoRAExecutor", ".vllm_lora_executor")
17-
PPOExecutor = _safe_import("PPOExecutor", ".ppo_executor")
18-
DPOExecutor = _safe_import("DPOExecutor", ".dpo_executor")
19-
SFTExecutor = _safe_import("SFTExecutor", ".sft_executor")
20-
LoRASFTExecutor = _safe_import("LoRASFTExecutor", ".lora_sft_executor")
21-
ImageClassificationTrainingExecutor = _safe_import(
20+
VLLMExecutor = _import_executor("VLLMExecutor", ".vllm_executor")
21+
VLLMLoRAExecutor = _import_executor("VLLMLoRAExecutor", ".vllm_lora_executor")
22+
PPOExecutor = _import_executor("PPOExecutor", ".ppo_executor")
23+
DPOExecutor = _import_executor("DPOExecutor", ".dpo_executor")
24+
SFTExecutor = _import_executor("SFTExecutor", ".sft_executor")
25+
LoRASFTExecutor = _import_executor("LoRASFTExecutor", ".lora_sft_executor")
26+
ImageClassificationTrainingExecutor = _import_executor(
2227
"ImageClassificationTrainingExecutor", ".image_classification_executor"
2328
)
24-
HFTransformersExecutor = _safe_import(
29+
HFTransformersExecutor = _import_executor(
2530
"HFTransformersExecutor", ".transformers_executor"
2631
)
27-
RAGExecutor = _safe_import("RAGExecutor", ".rag_executor")
28-
AgentExecutor = _safe_import("AgentExecutor", ".agent_executor")
29-
EchoExecutor = _safe_import("EchoExecutor", ".echo_executor")
30-
DataProfilingExecutor = _safe_import(
32+
RAGExecutor = _import_executor("RAGExecutor", ".rag_executor")
33+
AgentExecutor = _import_executor("AgentExecutor", ".agent_executor")
34+
EchoExecutor = _import_executor("EchoExecutor", ".echo_executor")
35+
DataProfilingExecutor = _import_executor(
3136
"DataProfilingExecutor", ".data_profiling_executor"
3237
)
33-
DataRetrievalExecutor = _safe_import(
38+
DataRetrievalExecutor = _import_executor(
3439
"DataRetrievalExecutor", ".data_retrieval_executor"
3540
)
36-
DiffusersExecutor = _safe_import("DiffusersExecutor", ".diffusers_executor")
37-
APIExecutor = _safe_import("APIExecutor", ".api_executor")
38-
SSHExecutor = _safe_import("SSHExecutor", ".ssh_executor")
39-
OmniText2ImageExecutor = _safe_import(
41+
DiffusersExecutor = _import_executor("DiffusersExecutor", ".diffusers_executor")
42+
APIExecutor = _import_executor("APIExecutor", ".api_executor")
43+
SSHExecutor = _import_executor("SSHExecutor", ".ssh_executor")
44+
OmniText2ImageExecutor = _import_executor(
4045
"OmniText2ImageExecutor", ".omni_text2image_executor"
4146
)
42-
OmniText2SpeechExecutor = _safe_import(
47+
OmniText2SpeechExecutor = _import_executor(
4348
"OmniText2SpeechExecutor", ".omni_text2speech_executor"
4449
)
45-
OmniText2AudioExecutor = _safe_import(
50+
OmniText2AudioExecutor = _import_executor(
4651
"OmniText2AudioExecutor", ".omni_text2audio_executor"
4752
)
48-
OmniText2GeneralExecutor = _safe_import(
53+
OmniText2GeneralExecutor = _import_executor(
4954
"OmniText2GeneralExecutor", ".omni_text2general_executor"
5055
)
5156

52-
EXECUTOR_REGISTRY: dict[str, type | None] = {
57+
EXECUTOR_REGISTRY: dict[str, type[Executor] | None] = {
5358
"vllm": VLLMExecutor,
5459
"vllm_lora": VLLMLoRAExecutor,
5560
"ppo": PPOExecutor,

0 commit comments

Comments
 (0)