Skip to content

Commit 49ff4e2

Browse files
authored
feat: extend capability gate to all task types (#76)
1 parent d551a73 commit 49ff4e2

40 files changed

Lines changed: 478 additions & 82 deletions

docs/ARCHITECTURE.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,12 +127,12 @@ scripts/dev/ compile_protos, sync_requirements, check_env_examples
127127
`WorkerHardware`. The dispatcher's `_cached_worker_candidates` filters
128128
to workers whose cache covers the task's references; entries older
129129
than `WORKER_CACHE_TTL_SEC` are ignored.
130-
- **Worker capabilities.** Beyond hardware fit, each worker advertises a
131-
`WorkerCapabilities` describing the task kinds it can service. The dispatcher
132-
routes a task that requires a capability only to workers advertising it, and
133-
excludes a worker that advertises none from those tasks rather than failing
134-
them on it. SSH is one such capability — a worker advertises it when it can
135-
launch session containers.
130+
- **Worker capabilities.** Beyond hardware fit, each worker advertises the set
131+
of task types it can service, and the dispatcher routes a task only to workers
132+
that advertise its type. A worker advertises a type only when its executor came
133+
up — e.g. SSH requires a reachable Docker daemon, and training or omni types
134+
require their (often GPU-only) dependencies — so a worker missing that executor
135+
isn't a candidate, rather than being handed a task it would fail.
136136
- **Cursor pagination.** List endpoints accept `limit` and `before` /
137137
`after` cursors. The cursor is an opaque base64 of `(timestamp, id)`;
138138
do not parse client-side.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ runtime-agent = [
8383
"arxiv>=2.2.0",
8484
"beautifulsoup4>=4.14.3",
8585
"chunkr-ai>=0.3.7",
86-
"crawl4ai>=0.8.9",
86+
"crawl4ai>=0.9.0",
8787
"ddgs>=9.10.0",
8888
"google-genai>=1.47.0",
8989
"hydra-core>=1.3.2",

sdk/src/flowmesh/models/workers.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
from pydantic import BaseModel, Field, field_validator
66

7+
from .common import TaskType
8+
79

810
class CPUInfo(BaseModel):
911
logical_cores: int | None = None
@@ -80,7 +82,7 @@ class SSHLimits(BaseModel):
8082

8183

8284
class WorkerCapabilities(BaseModel):
83-
ssh: bool = False
85+
supported_task_types: frozenset[TaskType] = Field(default_factory=frozenset)
8486

8587

8688
class Worker(BaseModel):

src/server/dispatcher/base.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ def dispatch_once(self, task_id: str) -> bool:
303303
self._requeue_task(task_id, reason="no_selection")
304304
return False
305305

306-
# 5.5. Plan task merge: coalesce sibling merge candidates onto this worker
306+
# Plan task merge: coalesce sibling merge candidates onto this worker
307307
merged_children: list[str] = []
308308
if self._task_merge_enabled and self._task_merge_max_batch_size > 1:
309309
merged_children = self._runtime.plan_merge(
@@ -341,11 +341,21 @@ def dispatch_once(self, task_id: str) -> bool:
341341
self._fail_task(task_id, str(exc), payload={"error": str(exc)})
342342
return True
343343

344-
# 6.5. Conditional execution: skip dispatch if condition not met
344+
# Re-validate resolved task spec
345+
try:
346+
rendered_task.spec.validate_dispatchable()
347+
except ValueError as exc:
348+
self._runtime.release_merge(task_id)
349+
self._fail_task(
350+
task_id, "spec_validation_failed", payload={"error": str(exc)}
351+
)
352+
return True
353+
354+
# Conditional execution: skip dispatch if condition not met
345355
if self._evaluate_condition_skip(task_id, rendered_task, record):
346356
return True
347357

348-
# 6.6. Resolve merged-child rendered payloads
358+
# Resolve merged-child rendered payloads
349359
rendered_children: list[MergedChildTaskStrict] | None = None
350360
if record.merged_children:
351361
rendered_children = []

src/server/registries/worker.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -550,9 +550,7 @@ def hw_satisfies(worker: Worker, task: TaskEnvelope) -> bool:
550550

551551

552552
def capability_satisfies(worker: Worker, task: TaskEnvelope) -> bool:
553-
if isinstance(task.spec, (SSHSpecStrict, SSHSpecTemplate)):
554-
return worker.capabilities.ssh
555-
return True
553+
return task.spec.taskType in worker.capabilities.supported_task_types
556554

557555

558556
def _gpu_meets_requirements(hw: WorkerHardware, gpu_req: GPURequirements) -> bool:

src/server/task/parser.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,10 @@ def _build_task_template(
161161
) from exc
162162
raise ValueError(f"Invalid task payload{context}: {exc}") from exc
163163
_validate_ssh_access_mode(task, context)
164+
try:
165+
task.spec.validate_dispatchable()
166+
except ValueError as exc:
167+
raise ValueError(f"Invalid task payload{context}: {exc}") from exc
164168
return task
165169

166170

src/shared/schemas/worker.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
from pydantic import BaseModel, Field
44

5+
from shared.tasks.task_type import TaskType
6+
57

68
class WorkerStatus(StrEnum):
79
UNKNOWN = "UNKNOWN"
@@ -33,8 +35,9 @@ class SSHLimits(BaseModel):
3335
class WorkerCapabilities(BaseModel):
3436
"""Task capabilities a worker advertises to the dispatcher."""
3537

36-
ssh: bool = Field(
37-
default=False, description="Whether the worker can run SSH session tasks."
38+
supported_task_types: frozenset[TaskType] = Field(
39+
default_factory=frozenset,
40+
description="Types of tasks this worker can service.",
3841
)
3942

4043

src/shared/tasks/components/model.py

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,6 @@ class ModelConfig(StrictBaseModel):
7676
diffusers: dict[str, Any] | None = None
7777
adapters: list[AdapterConfig] | None = None
7878

79-
def has_lora_adapter(self) -> bool:
80-
adapters = self.adapters or []
81-
for adapter in adapters:
82-
if (adapter.type or "").strip().lower() == "lora":
83-
return True
84-
return False
85-
8679

8780
class ModelConfigTemplate(TemplateBaseModel):
8881
source: ModelSourceTemplate | None = None
@@ -91,10 +84,3 @@ class ModelConfigTemplate(TemplateBaseModel):
9184
transformers: dict[str, Any] | None = None
9285
diffusers: dict[str, Any] | None = None
9386
adapters: list[AdapterConfigTemplate] | None = None
94-
95-
def has_lora_adapter(self) -> bool:
96-
adapters = self.adapters or []
97-
for adapter in adapters:
98-
if (adapter.type or "").strip().lower() == "lora":
99-
return True
100-
return False

src/shared/tasks/specs/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
TaskSpecTemplateBase,
66
)
77
from .diffusion import DiffusionSpecStrict, DiffusionSpecTemplate
8-
from .inference import InferenceSpecStrict, InferenceSpecTemplate
8+
from .inference import InferenceBackend, InferenceSpecStrict, InferenceSpecTemplate
99
from .misc import (
1010
AgentSpecStrict,
1111
AgentSpecTemplate,
@@ -56,6 +56,7 @@
5656
"DiffusionSpecTemplate",
5757
"ImageClassificationTrainingSpecStrict",
5858
"ImageClassificationTrainingSpecTemplate",
59+
"InferenceBackend",
5960
"InferenceSpecStrict",
6061
"InferenceSpecTemplate",
6162
"LoRASFTSpecStrict",

src/shared/tasks/specs/common.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,14 @@ def get_artifacts(self) -> list[str]:
9191
return []
9292
return artifacts.copy()
9393

94+
def validate_dispatchable(self) -> None:
95+
"""Validate spec-internal invariants for a runnable task.
96+
97+
Called at submit and again before dispatch. Overrides must raise ``ValueError``
98+
for misconfigurations.
99+
"""
100+
return None
101+
94102

95103
class TaskSpecTemplateBase(TemplateBaseModel):
96104
resources: ResourcesSpec | None = None
@@ -117,6 +125,15 @@ def get_artifacts(self) -> list[str]:
117125
return []
118126
return artifacts.copy()
119127

128+
def validate_dispatchable(self) -> None:
129+
"""Validate spec-internal invariants for a runnable task.
130+
131+
Called at submit and again before dispatch. Overrides must defer
132+
placeholder-dependent checks and raise ``ValueError`` for genuine
133+
misconfigurations.
134+
"""
135+
return None
136+
120137

121138
type TaskSpecBase = TaskSpecStrictBase | TaskSpecTemplateBase
122139

0 commit comments

Comments
 (0)