Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,12 @@ scripts/dev/ compile_protos, sync_requirements, check_env_examples
`WorkerHardware`. The dispatcher's `_cached_worker_candidates` filters
to workers whose cache covers the task's references; entries older
than `WORKER_CACHE_TTL_SEC` are ignored.
- **Worker capabilities.** Beyond hardware fit, each worker advertises a
`WorkerCapabilities` describing the task kinds it can service. The dispatcher
routes a task that requires a capability only to workers advertising it, and
excludes a worker that advertises none from those tasks rather than failing
them on it. SSH is one such capability — a worker advertises it when it can
launch session containers.
- **Worker capabilities.** Beyond hardware fit, each worker advertises the set
of task types it can service, and the dispatcher routes a task only to workers
that advertise its type. A worker advertises a type only when its executor came
up — e.g. SSH requires a reachable Docker daemon, and training or omni types
require their (often GPU-only) dependencies — so a worker missing that executor
isn't a candidate, rather than being handed a task it would fail.
- **Cursor pagination.** List endpoints accept `limit` and `before` /
`after` cursors. The cursor is an opaque base64 of `(timestamp, id)`;
do not parse client-side.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ runtime-agent = [
"arxiv>=2.2.0",
"beautifulsoup4>=4.14.3",
"chunkr-ai>=0.3.7",
"crawl4ai>=0.8.9",
"crawl4ai>=0.9.0",
"ddgs>=9.10.0",
"google-genai>=1.47.0",
"hydra-core>=1.3.2",
Expand Down
4 changes: 3 additions & 1 deletion sdk/src/flowmesh/models/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from pydantic import BaseModel, Field, field_validator

from .common import TaskType


class CPUInfo(BaseModel):
logical_cores: int | None = None
Expand Down Expand Up @@ -80,7 +82,7 @@ class SSHLimits(BaseModel):


class WorkerCapabilities(BaseModel):
ssh: bool = False
supported_task_types: frozenset[TaskType] = Field(default_factory=frozenset)


class Worker(BaseModel):
Expand Down
4 changes: 1 addition & 3 deletions src/server/registries/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,9 +550,7 @@ def hw_satisfies(worker: Worker, task: TaskEnvelope) -> bool:


def capability_satisfies(worker: Worker, task: TaskEnvelope) -> bool:
if isinstance(task.spec, (SSHSpecStrict, SSHSpecTemplate)):
return worker.capabilities.ssh
return True
return task.spec.taskType in worker.capabilities.supported_task_types


def _gpu_meets_requirements(hw: WorkerHardware, gpu_req: GPURequirements) -> bool:
Expand Down
42 changes: 42 additions & 0 deletions src/server/task/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from shared.tasks import TaskEnvelopeTemplate, TaskType
from shared.tasks.components import TaskAnnotations
from shared.tasks.specs import InferenceBackend, InferenceSpecTemplate
from shared.utils import new_task_id, parse_bool_env
from shared.utils.json import safe_get

Expand Down Expand Up @@ -161,9 +162,50 @@ def _build_task_template(
) from exc
raise ValueError(f"Invalid task payload{context}: {exc}") from exc
_validate_ssh_access_mode(task, context)
_validate_inference_spec(task, context)
return task


def _validate_inference_spec(task: TaskEnvelopeTemplate, context: str) -> None:
"""Validate inference-task invariants."""
spec = task.spec
if not isinstance(spec, InferenceSpecTemplate):
return

model = spec.model
if model and model.adapters:
for adapter in model.adapters:
if not adapter.path and not adapter.url:
Comment thread
timzsu marked this conversation as resolved.
Outdated
raise ValueError(
f"Invalid task payload{context}: adapter "
f"{adapter.name or adapter.type!r} specifies neither path nor url "
"and cannot be loaded."
)

hardware = spec.resources.hardware if spec.resources else None
gpu = hardware.gpu if hardware else None

if spec.enforce_cpu is True:
if model and model.vllm:
raise ValueError(
f"Invalid task payload{context}: enforce_cpu is set but the model "
"configures a vLLM backend."
)
return

if isinstance(spec.enforce_cpu, str): # Unresolved template placeholder
Comment thread
timzsu marked this conversation as resolved.
Outdated
return
if spec.backend() is not InferenceBackend.VLLM:
return
if gpu and gpu.count is not None and gpu.count >= 1:
return
raise ValueError(
f"Invalid task payload{context}: inference task resolves to the vLLM "
"backend but requests no GPU. Set spec.resources.hardware.gpu.count >= 1, "
"or use enforce_cpu / a transformers-only model for CPU inference."
)


def _task_context_label(local_name: str | None, graph_node_name: str | None) -> str:
if graph_node_name:
return f" for graph node {graph_node_name!r}"
Expand Down
7 changes: 5 additions & 2 deletions src/shared/schemas/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from pydantic import BaseModel, Field

from shared.tasks.task_type import TaskType


class WorkerStatus(StrEnum):
UNKNOWN = "UNKNOWN"
Expand Down Expand Up @@ -33,8 +35,9 @@ class SSHLimits(BaseModel):
class WorkerCapabilities(BaseModel):
"""Task capabilities a worker advertises to the dispatcher."""

ssh: bool = Field(
default=False, description="Whether the worker can run SSH session tasks."
supported_task_types: frozenset[TaskType] = Field(
default_factory=frozenset,
description="Types of tasks this worker can service.",
)


Expand Down
14 changes: 0 additions & 14 deletions src/shared/tasks/components/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,6 @@ class ModelConfig(StrictBaseModel):
diffusers: dict[str, Any] | None = None
adapters: list[AdapterConfig] | None = None

def has_lora_adapter(self) -> bool:
adapters = self.adapters or []
for adapter in adapters:
if (adapter.type or "").strip().lower() == "lora":
return True
return False


class ModelConfigTemplate(TemplateBaseModel):
source: ModelSourceTemplate | None = None
Expand All @@ -91,10 +84,3 @@ class ModelConfigTemplate(TemplateBaseModel):
transformers: dict[str, Any] | None = None
diffusers: dict[str, Any] | None = None
adapters: list[AdapterConfigTemplate] | None = None

def has_lora_adapter(self) -> bool:
adapters = self.adapters or []
for adapter in adapters:
if (adapter.type or "").strip().lower() == "lora":
return True
return False
3 changes: 2 additions & 1 deletion src/shared/tasks/specs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
TaskSpecTemplateBase,
)
from .diffusion import DiffusionSpecStrict, DiffusionSpecTemplate
from .inference import InferenceSpecStrict, InferenceSpecTemplate
from .inference import InferenceBackend, InferenceSpecStrict, InferenceSpecTemplate
from .misc import (
AgentSpecStrict,
AgentSpecTemplate,
Expand Down Expand Up @@ -56,6 +56,7 @@
"DiffusionSpecTemplate",
"ImageClassificationTrainingSpecStrict",
"ImageClassificationTrainingSpecTemplate",
"InferenceBackend",
"InferenceSpecStrict",
"InferenceSpecTemplate",
"LoRASFTSpecStrict",
Expand Down
37 changes: 37 additions & 0 deletions src/shared/tasks/specs/inference.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from enum import StrEnum
from typing import Literal

from ..placeholders import TemplateBool, TemplateInt
Expand All @@ -10,17 +11,53 @@
)


class InferenceBackend(StrEnum):
"""The executor backend an inference task resolves to."""

TRANSFORMERS = "transformers"
VLLM = "vllm"
AUTO = "auto"


class InferenceSpecStrict(ModelInferSpecStrict):
taskType: Literal[TaskType.INFERENCE]

sloSeconds: int | None = None
parallel: ParallelSpec | None = None
enforce_cpu: bool | None = None

def backend(self) -> InferenceBackend:
return _inference_backend(self)


class InferenceSpecTemplate(ModelInferSpecTemplate):
taskType: Literal[TaskType.INFERENCE]

sloSeconds: TemplateInt | None = None
parallel: ParallelSpecTemplate | None = None
enforce_cpu: TemplateBool | None = None

def backend(self) -> InferenceBackend:
return _inference_backend(self)


def _inference_backend(
spec: InferenceSpecStrict | InferenceSpecTemplate,
) -> InferenceBackend:
"""Classify the executor backend an inference task resolves to.

The HF transformers executor runs on a GPU when one is available, so only ``VLLM``
(explicit vLLM config or adapters) strictly requires a GPU. ``AUTO`` is the unhinted
case: the runner prefers vLLM but falls back to the transformers executor when vLLM
is absent.
"""
if spec.enforce_cpu is True:
return InferenceBackend.TRANSFORMERS
model = spec.model
if model is None:
return InferenceBackend.AUTO
if model.vllm or model.adapters:
return InferenceBackend.VLLM
if model.transformers:
return InferenceBackend.TRANSFORMERS
return InferenceBackend.AUTO
9 changes: 5 additions & 4 deletions src/worker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,11 @@ The SSH session container is labelled with `flowmesh.ssh.task_id` and
`flowmesh.ssh.worker_id` so the server can clean it up if the worker
container is stopped externally.

At startup the worker initializes the SSH executor only if the Docker daemon is
reachable, and advertises its `ssh` capability accordingly; the dispatcher then
selects it for SSH tasks only when it can actually run them. (The supervisor
mounts the Docker socket only when SSH is enabled for the node; see
At startup the worker advertises the task types it can service, and the
dispatcher only routes a task to a worker that advertises its type. A worker
advertises a type only when its executor came up: SSH requires a reachable
Docker daemon, so `ssh` is offered only on Docker-capable workers. (The
supervisor mounts the Docker socket only when SSH is enabled for the node; see
`ENABLE_SSH_BY_DEFAULT` below.)

Relevant env vars for SSH tasks:
Expand Down
2 changes: 2 additions & 0 deletions src/worker/executors/agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from shared.schemas.artifact import ArtifactRef
from shared.schemas.result import BaseExecutorResult
from shared.tasks.specs import AgentSpecStrict
from shared.tasks.task_type import TaskType

from .base_executor import ExecutionError, Executor, ExecutorTask
from .utils.checkpoints import maybe_upload_artifacts, write_executor_result
Expand Down Expand Up @@ -66,6 +67,7 @@ class AgentExecutor(Executor):
"""Agent executor using youtu-agent (utu) framework"""

name = "agent"
supported_task_types = frozenset({TaskType.AGENT})

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
Expand Down
2 changes: 2 additions & 0 deletions src/worker/executors/api_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from shared.schemas.result import BaseExecutorResult
from shared.tasks.specs import ApiSpecStrict
from shared.tasks.task_type import TaskType

from .base_executor import ExecutionError, Executor, ExecutorTask

Expand Down Expand Up @@ -40,6 +41,7 @@ class APIExecutor(Executor):
"""

name = "api"
supported_task_types = frozenset({TaskType.API})

# ---- Class-level connection pool (shared across all instances) ----
_clients: ClassVar[dict[_ClientKey, httpx.Client]] = {}
Expand Down
8 changes: 6 additions & 2 deletions src/worker/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ def run(self, task: ExecutorTask, out_dir: Path) -> MyResult:
import json
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, TypeVar
from typing import Any, ClassVar, TypeVar

from shared.schemas.result import BaseExecutorResult
from shared.tasks import MergedChildTaskStrict
from shared.tasks.specs import TaskSpecStrictBase
from shared.tasks.task_type import TaskType
from shared.tasks.worker_message import WorkerHardware, WorkerTaskMessage
from worker.config import WorkerConfig
from worker.lifecycle import Lifecycle
Expand Down Expand Up @@ -67,8 +68,10 @@ class Executor(ABC):
Subclasses must implement `run` and may override `prepare` and `teardown`.
"""

#: Human-readable identifier for logging/telemetry
name: str = "executor"
"""Human-readable identifier for logging/telemetry"""
supported_task_types: ClassVar[frozenset[TaskType]] = frozenset()
"""Types of tasks this executor can service"""

def __init__(
self,
Expand Down Expand Up @@ -172,6 +175,7 @@ class EchoResult(BaseExecutorResult):

class EchoExecutor(Executor):
name = "echo"
supported_task_types = frozenset({TaskType.ECHO})

def run(self, task: ExecutorTask, out_dir: Path) -> EchoResult:
return EchoResult(
Expand Down
2 changes: 2 additions & 0 deletions src/worker/executors/data_profiling_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from shared.schemas.result import BaseExecutorResult
from shared.tasks.specs import DataProfilingSpecStrict
from shared.tasks.task_type import TaskType
from shared.utils.json import to_json_serializable, validate_keys

from ..connectors import get_connector_from_spec
Expand All @@ -31,6 +32,7 @@ class DataProfilingExecutor(DataMixin, Executor):
"""Executor that estimates SQL query costs by sampling SQL template params."""

name = "data_profiling"
supported_task_types = frozenset({TaskType.DATA_PROFILING})

def run(self, task: ExecutorTask, out_dir: Path) -> DataProfilingResult:
spec = self.require_spec(task, DataProfilingSpecStrict)
Expand Down
2 changes: 2 additions & 0 deletions src/worker/executors/data_retrieval_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from shared.schemas.artifact import ArtifactRef
from shared.schemas.result import BaseExecutorResult
from shared.tasks.specs import DataRetrievalSpecStrict
from shared.tasks.task_type import TaskType
from shared.utils.json import validate_keys

from ..connectors import LumidDataConnector, PostgreSQLConnector, S3Connector
Expand All @@ -40,6 +41,7 @@ class DataRetrievalResult(BaseExecutorResult):

class DataRetrievalExecutor(DataMixin, Executor):
name = "data_retrieval"
supported_task_types = frozenset({TaskType.DATA_RETRIEVAL})

def run(self, task: ExecutorTask, out_dir: Path) -> DataRetrievalResult:
spec = self.require_spec(task, DataRetrievalSpecStrict)
Expand Down
2 changes: 2 additions & 0 deletions src/worker/executors/diffusers_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from shared.schemas.artifact import ArtifactRef
from shared.schemas.result import BaseExecutorResult
from shared.tasks.specs import DiffusionSpecStrict
from shared.tasks.task_type import TaskType

from ..utils.logging import configure_hf_library_logging
from .base_executor import ExecutionError, Executor, ExecutorTask
Expand Down Expand Up @@ -54,6 +55,7 @@ class DiffusersExecutor(DataMixin, Executor):
"""Executor that runs text-to-image generation via Hugging Face Diffusers."""

name = "diffusers"
supported_task_types = frozenset({TaskType.DIFFUSION})

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
Expand Down
2 changes: 2 additions & 0 deletions src/worker/executors/dpo_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from shared.schemas.artifact import ArtifactRef
from shared.schemas.result import BaseExecutorResult
from shared.tasks.specs import DPOSpecStrict
from shared.tasks.task_type import TaskType
from shared.utils.manifest import scratch_dir

from ..utils.logging import configure_hf_library_logging
Expand Down Expand Up @@ -62,6 +63,7 @@ class DPOExecutor(TrainingMixin, Executor):
"""DPO training executor using TRL library."""

name = "dpo_executor"
supported_task_types = frozenset({TaskType.DPO})

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
Expand Down
2 changes: 2 additions & 0 deletions src/worker/executors/echo_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from shared.schemas.result import BaseExecutorResult
from shared.tasks.specs import EchoSpecStrict
from shared.tasks.task_type import TaskType

from .base_executor import ExecutionError, Executor, ExecutorTask
from .mixins.data import DataMixin
Expand All @@ -22,6 +23,7 @@ class EchoResult(BaseExecutorResult):

class EchoExecutor(DataMixin, Executor):
name = "echo"
supported_task_types = frozenset({TaskType.ECHO})

def _append_outputs(self, out_items: list[dict[str, Any]], value: Any) -> None:
if isinstance(value, list):
Expand Down
Loading