Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions providers/edge3/docs/architecture.rst
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,11 @@ The following features are known missing and will be implemented in increments:

- Overview about queues / jobs per queue
- Allow starting Edge Worker REST API separate to api-server
- Add some hints how to setup an additional worker

- Edge Worker CLI

- Use WebSockets instead of HTTP calls for communication
- Send logs also to TaskFileHandler if external logging services are used
- Integration into telemetry to send metrics from remote site
- Publish system metrics with heartbeats (CPU, Disk space, RAM, Load)
- Be more liberal e.g. on patch version. Currently requires exact version match
(In current state if versions do not match, the worker will gracefully shut
down when jobs are completed, no new jobs will be started)

- Tests

Expand Down
40 changes: 40 additions & 0 deletions providers/edge3/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,46 @@ config:
type: string
default: ~
example: ~
minimum_acceptable_core_version_for_workers:
description: |
The minimum acceptable version of the central Airflow site and edge worker to work with.

This is used to make sure that the central Airflow site and edge workers are compatible with each
other. When an edge worker connects to the central site, it reports its installed airflow
core version. If the version is lower than the minimum acceptable version, the central site will
notify the worker during heartbeat and raise an alert. The worker will gracefully terminate and
drain jobs and shut down.

If you know your workers are still compatible with older versions of the central site, you can set
this to a lower version to avoid having to update all workers at the same time as the central site.
Default is to force matching versions, so relaxing is optional to be configured.

If the airflow core version between edge worker and central site is not matchning, the worker will
report warning status.
version_added: 3.6.0
type: string
default: ~
example: "3.2.1"
minimum_acceptable_edge_version_for_workers:
description: |
The minimum acceptable version of the edge worker to work with the central site.

This is used to make sure that the central Airflow site and edge workers are compatible with each
other. When an edge worker connects to the central site, it reports its edge version. If the
version is lower than the minimum acceptable version, the central site will notify the worker
during heartbeat and raise an alert. The worker will gracefully terminate and drain jobs and shut
down.

If you know your edge workers are still compatible with older versions of the central site, you
can set this to a lower version to avoid having to update all workers at the same time as the
central site. Default is to force matching versions, so relaxing is optional to be configured.

If the edge version between edge worker and central site is not matchning, the worker will report
warning status.
version_added: 3.6.0
type: string
default: ~
example: "3.5.0"
extended_system_info_function:
description: |
The function to call to get extended system information for the worker.
Expand Down
23 changes: 21 additions & 2 deletions providers/edge3/src/airflow/providers/edge3/cli/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ class EdgeWorker:
"""Flag if job processing should be completed and no new jobs fetched for maintenance mode. """
maintenance_comments: str | None = None
"""Comments for maintenance mode."""
versions_match: bool = True
"""Whether the worker and the server have matching versions of Airflow and the Edge Provider."""
background_tasks: set[Task] = set()

def __init__(
Expand Down Expand Up @@ -327,7 +329,22 @@ async def _enforce_drain_timeout(self) -> bool:
async def _get_sysinfo(self) -> dict[str, str | int | float | datetime]:
"""Produce the sysinfo from worker to post to central site."""
sysinfo: dict[str, str | int | float | datetime] = {
"status": logging.INFO,
**(
{
"status": logging.INFO,
}
if self.versions_match
else {
"status": logging.WARNING,
"status_text": "Version mismatch on Edge Worker",
Comment thread
jscheffl marked this conversation as resolved.
Outdated
"version_mismatch_description": "The version between the Edge Worker and the "
"Airflow Core is not matching for either the edge or airflow package version. "
"Please check if the Edge Provider version is compatible with your Airflow "
"version. The worker will still operate but you might miss some features or "
"have issues. Please consider upgrading the Edge Provider to a compatible "
"version.",
}
),
"airflow_version": airflow_version,
"edge_provider_version": edge_provider_version,
"python_version": sys.version,
Expand Down Expand Up @@ -431,13 +448,14 @@ async def _push_logs_in_chunks(self, job: Job):
async def start(self):
"""Start the execution in a loop until terminated."""
try:
await worker_register(
register_result = await worker_register(
self.hostname,
EdgeWorkerState.STARTING,
self.queues,
await self._get_sysinfo(),
self.team_name,
)
self.versions_match = register_result.versions_match
except EdgeWorkerVersionException as e:
logger.info("Version mismatch of Edge worker and Core. Shutting down worker.")
raise SystemExit(str(e))
Expand Down Expand Up @@ -590,6 +608,7 @@ async def heartbeat(self, new_maintenance_comments: str | None = None) -> bool:
new_maintenance_comments,
team_name=self.team_name,
)
self.versions_match = worker_info.versions_match
self.queues = worker_info.queues
if worker_info.concurrency is not None and worker_info.concurrency != self.concurrency:
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,20 @@ def get_provider_info():
"default": None,
"example": None,
},
"minimum_acceptable_core_version_for_workers": {
"description": "The minimum acceptable version of the central Airflow site and edge worker to work with.\n\nThis is used to make sure that the central Airflow site and edge workers are compatible with each\nother. When an edge worker connects to the central site, it reports its installed airflow\ncore version. If the version is lower than the minimum acceptable version, the central site will\nnotify the worker during heartbeat and raise an alert. The worker will gracefully terminate and\ndrain jobs and shut down.\n\nIf you know your workers are still compatible with older versions of the central site, you can set\nthis to a lower version to avoid having to update all workers at the same time as the central site.\nDefault is to force matching versions, so relaxing is optional to be configured.\n\nIf the airflow core version between edge worker and central site is not matchning, the worker will\nreport warning status.\n",
"version_added": "3.6.0",
"type": "string",
"default": None,
"example": "3.2.1",
},
"minimum_acceptable_edge_version_for_workers": {
"description": "The minimum acceptable version of the edge worker to work with the central site.\n\nThis is used to make sure that the central Airflow site and edge workers are compatible with each\nother. When an edge worker connects to the central site, it reports its edge version. If the\nversion is lower than the minimum acceptable version, the central site will notify the worker\nduring heartbeat and raise an alert. The worker will gracefully terminate and drain jobs and shut\ndown.\n\nIf you know your edge workers are still compatible with older versions of the central site, you\ncan set this to a lower version to avoid having to update all workers at the same time as the\ncentral site. Default is to force matching versions, so relaxing is optional to be configured.\n\nIf the edge version between edge worker and central site is not matchning, the worker will report\nwarning status.\n",
"version_added": "3.6.0",
"type": "string",
"default": None,
"example": "3.5.0",
},
"extended_system_info_function": {
"description": "The function to call to get extended system information for the worker.\n\nThe function must be async and return a ``dict[str, str | int | float | datetime]``.\nThe information will be sent to the central site with each heartbeat and can be used for monitoring\nand debugging purposes. All int and float values will also be published to metric collection systems\nlike statsd or otel.\n\nFunction must be provided as a string with the full path to the function. See\nhttps://github.com/apache/airflow/blob/main/providers/edge3/src/airflow/providers/edge3/cli/example_extended_sysinfo.py\nfor an example implementation.\n",
"version_added": "3.5.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@ class WorkerRegistrationReturn(BaseModel):
"""The return class for the worker registration."""

last_update: Annotated[datetime, Field(description="Time of the last update of the worker.")]
versions_match: Annotated[
bool,
Field(
description="Whether the worker and the server have matching versions of Airflow and the Edge Provider. "
"If False, the worker version is not matching and might need to be upgraded. But version is still "
"compatible enough to work. If True, worker and server versions match.",
),
] = False # If not explicitly given assume it is not compatible


class WorkerSetStateReturn(BaseModel):
Expand All @@ -215,3 +223,11 @@ class WorkerSetStateReturn(BaseModel):
"None means no remote override; the worker uses its startup value.",
),
] = None
versions_match: Annotated[
bool,
Field(
description="Whether the worker and the server have matching versions of Airflow and the Edge Provider. "
"If False, the worker version is not matching and might need to be upgraded. But version is still "
"compatible enough to work. If True, worker and server versions match.",
),
] = False # If not explicitly given assume it is not compatible
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
from typing import Annotated

from fastapi import Body, Depends, HTTPException, Path, status
from packaging.version import Version
from sqlalchemy import select

from airflow import __version__ as airflow_version
from airflow.api_fastapi.common.db.common import SessionDep # noqa: TC001
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.providers.common.compat.sdk import Stats, timezone
from airflow.providers.common.compat.sdk import Stats, conf, timezone
from airflow.providers.edge3 import __version__ as edge_provider_version
from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, EdgeWorkerState, set_metrics
from airflow.providers.edge3.worker_api.auth import jwt_token_authorization_rest
from airflow.providers.edge3.worker_api.datamodels import (
Expand All @@ -49,38 +52,55 @@
)


def _assert_version(sysinfo: dict[str, str | int | float | datetime]) -> None:
"""Check if the Edge Worker version matches the central API site."""
from airflow import __version__ as airflow_version
from airflow.providers.edge3 import __version__ as edge_provider_version
def _version(version_str: str) -> tuple[int, int, int]:
"""Convert a version string into a tuple of integers for comparison."""
version = Version(version_str)
return version.major, version.minor, version.micro


# Note: In future, more stable versions we might be more liberate, for the
# moment we require exact version match for Edge Worker and core version
def _assert_version(sysinfo: dict[str, str | int | float | datetime]) -> bool:
"""Check if the Edge Worker version matches the central API site."""
versions_match = True
if "airflow_version" in sysinfo:
airflow_on_worker = sysinfo["airflow_version"]
airflow_on_worker: str = sysinfo["airflow_version"] # type: ignore
if airflow_on_worker != airflow_version:
raise HTTPException(
status.HTTP_400_BAD_REQUEST,
f"Edge Worker runs on Airflow {airflow_on_worker} "
f"and the core runs on {airflow_version}. Rejecting access due to difference.",
versions_match = False
minimum_acceptable_core_version_for_workers = conf.get(
"edge", "minimum_acceptable_core_version_for_workers", fallback=None
)
if not minimum_acceptable_core_version_for_workers or _version(airflow_on_worker) < _version(
minimum_acceptable_core_version_for_workers
):
raise HTTPException(
status.HTTP_400_BAD_REQUEST,
f"Edge Worker runs on Airflow {airflow_on_worker} which is below the minimum acceptable version "
f"{minimum_acceptable_core_version_for_workers}. Rejecting access due to incompatible version.",
)
else:
raise HTTPException(
status.HTTP_400_BAD_REQUEST, "Edge Worker does not specify the version it is running on."
)

if "edge_provider_version" in sysinfo:
provider_on_worker = sysinfo["edge_provider_version"]
provider_on_worker: str = sysinfo["edge_provider_version"] # type: ignore
if provider_on_worker != edge_provider_version:
raise HTTPException(
status.HTTP_400_BAD_REQUEST,
f"Edge Worker runs on Edge Provider {provider_on_worker} "
f"and the core runs on {edge_provider_version}. Rejecting access due to difference.",
versions_match = False
minimum_acceptable_edge_version_for_workers = conf.get(
"edge", "minimum_acceptable_edge_version_for_workers", fallback=None
)
if not minimum_acceptable_edge_version_for_workers or _version(provider_on_worker) < _version(
minimum_acceptable_edge_version_for_workers
):
raise HTTPException(
status.HTTP_400_BAD_REQUEST,
f"Edge Worker runs on Edge Provider {provider_on_worker} "
f"and the core runs on {edge_provider_version}. Rejecting access due to difference.",
)
else:
raise HTTPException(
status.HTTP_400_BAD_REQUEST, "Edge Worker does not specify the provider version it is running on."
)
return versions_match


_worker_name_doc = Path(title="Worker Name", description="Hostname or instance name of the worker")
Expand Down Expand Up @@ -164,7 +184,7 @@ def register(
session: SessionDep,
) -> WorkerRegistrationReturn:
"""Register a new worker to the backend."""
_assert_version(body.sysinfo)
versions_match = _assert_version(body.sysinfo)
query = select(EdgeWorkerModel).where(EdgeWorkerModel.worker_name == worker_name)
worker: EdgeWorkerModel | None = session.scalar(query)
if not worker:
Expand Down Expand Up @@ -193,7 +213,7 @@ def register(
worker.last_update = timezone.utcnow()
worker.team_name = body.team_name
session.add(worker)
return WorkerRegistrationReturn(last_update=worker.last_update)
return WorkerRegistrationReturn(last_update=worker.last_update, versions_match=versions_match)


@worker_router.patch("/{worker_name}", dependencies=[Depends(jwt_token_authorization_rest)])
Expand Down Expand Up @@ -227,12 +247,13 @@ def set_state(
queues=worker.queues,
sysinfo=body.sysinfo,
)
_assert_version(body.sysinfo) # Exception only after worker state is in the DB
versions_match = _assert_version(body.sysinfo) # Exception only after worker state is in the DB
return WorkerSetStateReturn(
state=worker.state,
queues=worker.queues,
maintenance_comments=worker.maintenance_comment,
concurrency=worker.concurrency,
versions_match=versions_match,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1453,6 +1453,14 @@ components:
format: date-time
title: Last Update
description: Time of the last update of the worker.
versions_match:
type: boolean
title: Versions Match
description: Whether the worker and the server have matching versions of
Airflow and the Edge Provider. If False, the worker version is not matching
and might need to be upgraded. But version is still compatible enough
to work. If True, worker and server versions match.
default: false
type: object
required:
- last_update
Expand Down Expand Up @@ -1485,6 +1493,14 @@ components:
title: Concurrency
description: Desired concurrency for the worker set by an administrator.
None means no remote override; the worker uses its startup value.
versions_match:
type: boolean
title: Versions Match
description: Whether the worker and the server have matching versions of
Airflow and the Edge Provider. If False, the worker version is not matching
and might need to be upgraded. But version is still compatible enough
to work. If True, worker and server versions match.
default: false
type: object
required:
- state
Expand Down
16 changes: 15 additions & 1 deletion providers/edge3/tests/unit/edge3/cli/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import contextlib
import importlib
import json
import logging
import multiprocessing
import signal
from datetime import datetime
Expand Down Expand Up @@ -683,10 +684,23 @@ async def test_get_sysinfo(self, worker_with_job: EdgeWorker):
assert "concurrency" in sysinfo
assert "worker_start_time" in sysinfo
assert sysinfo["worker_start_time"] == worker_with_job.worker_start_time
assert "status" in sysinfo
assert sysinfo["status"] == logging.INFO
assert "status_text" not in sysinfo # is only defined if extended sysinfo provides this field
assert sysinfo["concurrency"] == concurrency

@pytest.mark.asyncio
async def test_get_sysinfo_version_mismatch(self, worker_with_job: EdgeWorker):
worker_with_job.versions_match = False # Simulate version mismatch to verify sysinfo
sysinfo = await worker_with_job._get_sysinfo()
assert "airflow_version" in sysinfo
assert "edge_provider_version" in sysinfo
assert "python_version" in sysinfo
assert "concurrency" in sysinfo
assert "worker_start_time" in sysinfo
assert sysinfo["worker_start_time"] == worker_with_job.worker_start_time
assert sysinfo["status"] == logging.WARNING
assert "status_text" in sysinfo

@pytest.mark.asyncio
async def test_get_sysinfo_extended(self, worker_with_job_and_sysinfo: EdgeWorker):
concurrency = 42
Expand Down
Loading
Loading