Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,12 @@ scripts/dev/ compile_protos, sync_requirements, check_env_examples
`WorkerHardware`. The dispatcher's `_cached_worker_candidates` filters
to workers whose cache covers the task's references; entries older
than `WORKER_CACHE_TTL_SEC` are ignored.
- **Worker capabilities.** Beyond hardware fit, each worker advertises a
`WorkerCapabilities` describing the task kinds it can service. The dispatcher
routes a task that requires a capability only to workers advertising it, and
excludes a worker that advertises none from those tasks rather than failing
them on it. SSH is one such capability — a worker advertises it when it can
launch session containers.
- **Worker capabilities.** Beyond hardware fit, each worker advertises the set
of task types it can service, and the dispatcher routes a task only to workers
that advertise its type. A worker advertises a type only when its executor came
up — e.g. SSH requires a reachable Docker daemon, and training or omni types
require their (often GPU-only) dependencies — so a worker missing that executor
isn't a candidate, rather than being handed a task it would fail.
- **Cursor pagination.** List endpoints accept `limit` and `before` /
`after` cursors. The cursor is an opaque base64 of `(timestamp, id)`;
do not parse client-side.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ runtime-agent = [
"arxiv>=2.2.0",
"beautifulsoup4>=4.14.3",
"chunkr-ai>=0.3.7",
"crawl4ai>=0.8.9",
"crawl4ai>=0.9.0",
"ddgs>=9.10.0",
"google-genai>=1.47.0",
"hydra-core>=1.3.2",
Expand Down
4 changes: 3 additions & 1 deletion sdk/src/flowmesh/models/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from pydantic import BaseModel, Field, field_validator

from .common import TaskType


class CPUInfo(BaseModel):
logical_cores: int | None = None
Expand Down Expand Up @@ -80,7 +82,7 @@ class SSHLimits(BaseModel):


class WorkerCapabilities(BaseModel):
ssh: bool = False
supported_task_types: frozenset[TaskType] = Field(default_factory=frozenset)


class Worker(BaseModel):
Expand Down
16 changes: 13 additions & 3 deletions src/server/dispatcher/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def dispatch_once(self, task_id: str) -> bool:
self._requeue_task(task_id, reason="no_selection")
return False

# 5.5. Plan task merge: coalesce sibling merge candidates onto this worker
# Plan task merge: coalesce sibling merge candidates onto this worker
merged_children: list[str] = []
if self._task_merge_enabled and self._task_merge_max_batch_size > 1:
merged_children = self._runtime.plan_merge(
Expand Down Expand Up @@ -341,11 +341,21 @@ def dispatch_once(self, task_id: str) -> bool:
self._fail_task(task_id, str(exc), payload={"error": str(exc)})
return True

# 6.5. Conditional execution: skip dispatch if condition not met
# Re-validate resolved task spec
try:
rendered_task.spec.validate_dispatchable()
except ValueError as exc:
self._runtime.release_merge(task_id)
self._fail_task(
task_id, "spec_validation_failed", payload={"error": str(exc)}
)
return True

# Conditional execution: skip dispatch if condition not met
if self._evaluate_condition_skip(task_id, rendered_task, record):
return True

# 6.6. Resolve merged-child rendered payloads
# Resolve merged-child rendered payloads
rendered_children: list[MergedChildTaskStrict] | None = None
if record.merged_children:
rendered_children = []
Expand Down
4 changes: 1 addition & 3 deletions src/server/registries/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,9 +550,7 @@ def hw_satisfies(worker: Worker, task: TaskEnvelope) -> bool:


def capability_satisfies(worker: Worker, task: TaskEnvelope) -> bool:
if isinstance(task.spec, (SSHSpecStrict, SSHSpecTemplate)):
return worker.capabilities.ssh
return True
return task.spec.taskType in worker.capabilities.supported_task_types


def _gpu_meets_requirements(hw: WorkerHardware, gpu_req: GPURequirements) -> bool:
Expand Down
4 changes: 4 additions & 0 deletions src/server/task/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ def _build_task_template(
) from exc
raise ValueError(f"Invalid task payload{context}: {exc}") from exc
_validate_ssh_access_mode(task, context)
try:
task.spec.validate_dispatchable()
except ValueError as exc:
raise ValueError(f"Invalid task payload{context}: {exc}") from exc
return task


Expand Down
7 changes: 5 additions & 2 deletions src/shared/schemas/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from pydantic import BaseModel, Field

from shared.tasks.task_type import TaskType


class WorkerStatus(StrEnum):
UNKNOWN = "UNKNOWN"
Expand Down Expand Up @@ -33,8 +35,9 @@ class SSHLimits(BaseModel):
class WorkerCapabilities(BaseModel):
"""Task capabilities a worker advertises to the dispatcher."""

ssh: bool = Field(
default=False, description="Whether the worker can run SSH session tasks."
supported_task_types: frozenset[TaskType] = Field(
default_factory=frozenset,
description="Types of tasks this worker can service.",
)


Expand Down
14 changes: 0 additions & 14 deletions src/shared/tasks/components/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,6 @@ class ModelConfig(StrictBaseModel):
diffusers: dict[str, Any] | None = None
adapters: list[AdapterConfig] | None = None

def has_lora_adapter(self) -> bool:
adapters = self.adapters or []
for adapter in adapters:
if (adapter.type or "").strip().lower() == "lora":
return True
return False


class ModelConfigTemplate(TemplateBaseModel):
source: ModelSourceTemplate | None = None
Expand All @@ -91,10 +84,3 @@ class ModelConfigTemplate(TemplateBaseModel):
transformers: dict[str, Any] | None = None
diffusers: dict[str, Any] | None = None
adapters: list[AdapterConfigTemplate] | None = None

def has_lora_adapter(self) -> bool:
adapters = self.adapters or []
for adapter in adapters:
if (adapter.type or "").strip().lower() == "lora":
return True
return False
3 changes: 2 additions & 1 deletion src/shared/tasks/specs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
TaskSpecTemplateBase,
)
from .diffusion import DiffusionSpecStrict, DiffusionSpecTemplate
from .inference import InferenceSpecStrict, InferenceSpecTemplate
from .inference import InferenceBackend, InferenceSpecStrict, InferenceSpecTemplate
from .misc import (
AgentSpecStrict,
AgentSpecTemplate,
Expand Down Expand Up @@ -56,6 +56,7 @@
"DiffusionSpecTemplate",
"ImageClassificationTrainingSpecStrict",
"ImageClassificationTrainingSpecTemplate",
"InferenceBackend",
"InferenceSpecStrict",
"InferenceSpecTemplate",
"LoRASFTSpecStrict",
Expand Down
17 changes: 17 additions & 0 deletions src/shared/tasks/specs/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ def get_artifacts(self) -> list[str]:
return []
return artifacts.copy()

def validate_dispatchable(self) -> None:
"""Validate spec-internal invariants for a runnable task.

Called at submit and again before dispatch. Overrides must raise ``ValueError``
for misconfigurations.
"""
return None


class TaskSpecTemplateBase(TemplateBaseModel):
resources: ResourcesSpec | None = None
Expand All @@ -117,6 +125,15 @@ def get_artifacts(self) -> list[str]:
return []
return artifacts.copy()

def validate_dispatchable(self) -> None:
"""Validate spec-internal invariants for a runnable task.

Called at submit and again before dispatch. Overrides must defer
placeholder-dependent checks and raise ``ValueError`` for genuine
misconfigurations.
"""
return None


type TaskSpecBase = TaskSpecStrictBase | TaskSpecTemplateBase

Expand Down
77 changes: 77 additions & 0 deletions src/shared/tasks/specs/inference.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from enum import StrEnum
from typing import Literal

from ..placeholders import TemplateBool, TemplateInt
Expand All @@ -10,17 +11,93 @@
)


class InferenceBackend(StrEnum):
"""The executor backend an inference task resolves to."""

TRANSFORMERS = "transformers"
VLLM = "vllm"
AUTO = "auto"


class InferenceSpecStrict(ModelInferSpecStrict):
taskType: Literal[TaskType.INFERENCE]

sloSeconds: int | None = None
parallel: ParallelSpec | None = None
enforce_cpu: bool | None = None

def backend(self) -> InferenceBackend:
return _inference_backend(self)

def validate_dispatchable(self) -> None:
_validate_inference_dispatchable(self)


class InferenceSpecTemplate(ModelInferSpecTemplate):
taskType: Literal[TaskType.INFERENCE]

sloSeconds: TemplateInt | None = None
parallel: ParallelSpecTemplate | None = None
enforce_cpu: TemplateBool | None = None

def backend(self) -> InferenceBackend:
return _inference_backend(self)

def validate_dispatchable(self) -> None:
_validate_inference_dispatchable(self)


def _inference_backend(
spec: InferenceSpecStrict | InferenceSpecTemplate,
) -> InferenceBackend:
"""Classify the executor backend an inference task resolves to.

The HF transformers executor runs on a GPU when one is available, so only ``VLLM``
(explicit vLLM config or adapters) strictly requires a GPU. ``AUTO`` is the unhinted
case: the runner prefers vLLM but falls back to the transformers executor when vLLM
is absent.
"""
if spec.enforce_cpu is True:
return InferenceBackend.TRANSFORMERS
model = spec.model
if model is None:
return InferenceBackend.AUTO
if model.vllm or model.adapters:
return InferenceBackend.VLLM
if model.transformers:
return InferenceBackend.TRANSFORMERS
return InferenceBackend.AUTO


def _validate_inference_dispatchable(
spec: InferenceSpecStrict | InferenceSpecTemplate,
) -> None:
model = spec.model
if model and (adapters := model.adapters):
for adapter in adapters:
if not adapter.path and not adapter.url and not adapter.task_id:
raise ValueError(
f"adapter {adapter.name or adapter.type!r} specifies no path, "
"url, or task_id and cannot be loaded."
)

if isinstance(spec.enforce_cpu, str): # Unresolved template placeholder
return
if spec.enforce_cpu is True:
if model and model.vllm:
raise ValueError(
"enforce_cpu is set but the model configures a vLLM backend."
)
return

if spec.backend() is not InferenceBackend.VLLM:
return
hardware = spec.resources.hardware if spec.resources else None
gpu = hardware.gpu if hardware else None
if gpu and gpu.count is not None and gpu.count >= 1:
return
raise ValueError(
"inference task resolves to the vLLM backend but requests no GPU. Set "
"spec.resources.hardware.gpu.count >= 1, or use enforce_cpu / a "
"transformers-only model for CPU inference."
)
9 changes: 5 additions & 4 deletions src/worker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,11 @@ The SSH session container is labelled with `flowmesh.ssh.task_id` and
`flowmesh.ssh.worker_id` so the server can clean it up if the worker
container is stopped externally.

At startup the worker initializes the SSH executor only if the Docker daemon is
reachable, and advertises its `ssh` capability accordingly; the dispatcher then
selects it for SSH tasks only when it can actually run them. (The supervisor
mounts the Docker socket only when SSH is enabled for the node; see
At startup the worker advertises the task types it can service, and the
dispatcher only routes a task to a worker that advertises its type. A worker
advertises a type only when its executor came up: SSH requires a reachable
Docker daemon, so `ssh` is offered only on Docker-capable workers. (The
supervisor mounts the Docker socket only when SSH is enabled for the node; see
`ENABLE_SSH_BY_DEFAULT` below.)

Relevant env vars for SSH tasks:
Expand Down
2 changes: 2 additions & 0 deletions src/worker/executors/agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from shared.schemas.artifact import ArtifactRef
from shared.schemas.result import BaseExecutorResult
from shared.tasks.specs import AgentSpecStrict
from shared.tasks.task_type import TaskType

from .base_executor import ExecutionError, Executor, ExecutorTask
from .utils.checkpoints import maybe_upload_artifacts, write_executor_result
Expand Down Expand Up @@ -66,6 +67,7 @@ class AgentExecutor(Executor):
"""Agent executor using youtu-agent (utu) framework"""

name = "agent"
supported_task_types = frozenset({TaskType.AGENT})

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
Expand Down
2 changes: 2 additions & 0 deletions src/worker/executors/api_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from shared.schemas.result import BaseExecutorResult
from shared.tasks.specs import ApiSpecStrict
from shared.tasks.task_type import TaskType

from .base_executor import ExecutionError, Executor, ExecutorTask

Expand Down Expand Up @@ -40,6 +41,7 @@ class APIExecutor(Executor):
"""

name = "api"
supported_task_types = frozenset({TaskType.API})

# ---- Class-level connection pool (shared across all instances) ----
_clients: ClassVar[dict[_ClientKey, httpx.Client]] = {}
Expand Down
8 changes: 6 additions & 2 deletions src/worker/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ def run(self, task: ExecutorTask, out_dir: Path) -> MyResult:
import json
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, TypeVar
from typing import Any, ClassVar, TypeVar

from shared.schemas.result import BaseExecutorResult
from shared.tasks import MergedChildTaskStrict
from shared.tasks.specs import TaskSpecStrictBase
from shared.tasks.task_type import TaskType
from shared.tasks.worker_message import WorkerHardware, WorkerTaskMessage
from worker.config import WorkerConfig
from worker.lifecycle import Lifecycle
Expand Down Expand Up @@ -67,8 +68,10 @@ class Executor(ABC):
Subclasses must implement `run` and may override `prepare` and `teardown`.
"""

#: Human-readable identifier for logging/telemetry
name: str = "executor"
"""Human-readable identifier for logging/telemetry"""
supported_task_types: ClassVar[frozenset[TaskType]] = frozenset()
"""Types of tasks this executor can service"""

def __init__(
self,
Expand Down Expand Up @@ -172,6 +175,7 @@ class EchoResult(BaseExecutorResult):

class EchoExecutor(Executor):
name = "echo"
supported_task_types = frozenset({TaskType.ECHO})

def run(self, task: ExecutorTask, out_dir: Path) -> EchoResult:
return EchoResult(
Expand Down
2 changes: 2 additions & 0 deletions src/worker/executors/data_profiling_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from shared.schemas.result import BaseExecutorResult
from shared.tasks.specs import DataProfilingSpecStrict
from shared.tasks.task_type import TaskType
from shared.utils.json import to_json_serializable, validate_keys

from ..connectors import get_connector_from_spec
Expand All @@ -31,6 +32,7 @@ class DataProfilingExecutor(DataMixin, Executor):
"""Executor that estimates SQL query costs by sampling SQL template params."""

name = "data_profiling"
supported_task_types = frozenset({TaskType.DATA_PROFILING})

def run(self, task: ExecutorTask, out_dir: Path) -> DataProfilingResult:
spec = self.require_spec(task, DataProfilingSpecStrict)
Expand Down
Loading
Loading