diff --git a/providers/edge3/docs/architecture.rst b/providers/edge3/docs/architecture.rst index e9e586183cf86..688ee372844fd 100644 --- a/providers/edge3/docs/architecture.rst +++ b/providers/edge3/docs/architecture.rst @@ -156,17 +156,11 @@ The following features are known missing and will be implemented in increments: - Overview about queues / jobs per queue - Allow starting Edge Worker REST API separate to api-server - - Add some hints how to setup an additional worker - Edge Worker CLI - Use WebSockets instead of HTTP calls for communication - Send logs also to TaskFileHandler if external logging services are used - - Integration into telemetry to send metrics from remote site - - Publish system metrics with heartbeats (CPU, Disk space, RAM, Load) - - Be more liberal e.g. on patch version. Currently requires exact version match - (In current state if versions do not match, the worker will gracefully shut - down when jobs are completed, no new jobs will be started) - Tests diff --git a/providers/edge3/provider.yaml b/providers/edge3/provider.yaml index c160b23b278fd..4dab4e1c9c8b5 100644 --- a/providers/edge3/provider.yaml +++ b/providers/edge3/provider.yaml @@ -192,6 +192,46 @@ config: type: string default: ~ example: ~ + minimum_acceptable_core_version_for_workers: + description: | + The minimum acceptable version of the central Airflow site and edge worker to work with. + + This is used to make sure that the central Airflow site and edge workers are compatible with each + other. When an edge worker connects to the central site, it reports its installed airflow + core version. If the version is lower than the minimum acceptable version, the central site will + notify the worker during heartbeat and raise an alert. The worker will gracefully terminate and + drain jobs and shut down. + + If you know your workers are still compatible with older versions of the central site, you can set + this to a lower version to avoid having to update all workers at the same time as the central site. + Default is to force matching versions, so relaxing is optional to be configured. + + If the airflow core version between edge worker and central site is not matchning, the worker will + report warning status. + version_added: 3.6.0 + type: string + default: ~ + example: "3.2.1" + minimum_acceptable_edge_version_for_workers: + description: | + The minimum acceptable version of the edge worker to work with the central site. + + This is used to make sure that the central Airflow site and edge workers are compatible with each + other. When an edge worker connects to the central site, it reports its edge version. If the + version is lower than the minimum acceptable version, the central site will notify the worker + during heartbeat and raise an alert. The worker will gracefully terminate and drain jobs and shut + down. + + If you know your edge workers are still compatible with older versions of the central site, you + can set this to a lower version to avoid having to update all workers at the same time as the + central site. Default is to force matching versions, so relaxing is optional to be configured. + + If the edge version between edge worker and central site is not matchning, the worker will report + warning status. + version_added: 3.6.0 + type: string + default: ~ + example: "3.5.0" extended_system_info_function: description: | The function to call to get extended system information for the worker. diff --git a/providers/edge3/src/airflow/providers/edge3/cli/worker.py b/providers/edge3/src/airflow/providers/edge3/cli/worker.py index 2fa261b74f978..510b456882031 100644 --- a/providers/edge3/src/airflow/providers/edge3/cli/worker.py +++ b/providers/edge3/src/airflow/providers/edge3/cli/worker.py @@ -114,6 +114,8 @@ class EdgeWorker: """Flag if job processing should be completed and no new jobs fetched for maintenance mode. """ maintenance_comments: str | None = None """Comments for maintenance mode.""" + versions_match: bool = True + """Whether the worker and the server have matching versions of Airflow and the Edge Provider.""" background_tasks: set[Task] = set() def __init__( @@ -327,7 +329,22 @@ async def _enforce_drain_timeout(self) -> bool: async def _get_sysinfo(self) -> dict[str, str | int | float | datetime]: """Produce the sysinfo from worker to post to central site.""" sysinfo: dict[str, str | int | float | datetime] = { - "status": logging.INFO, + **( + { + "status": logging.INFO, + } + if self.versions_match + else { + "status": logging.WARNING, + "status_text": "Healthy but version mismatch", + "version_mismatch_description": "The version between the Edge Worker and the " + "Airflow Core is not matching for either the edge or airflow package version. " + "Please check if the Edge Provider version is compatible with your Airflow " + "version. The worker will still operate but you might miss some features or " + "have issues. Please consider upgrading the Edge Provider to a compatible " + "version.", + } + ), "airflow_version": airflow_version, "edge_provider_version": edge_provider_version, "python_version": sys.version, @@ -431,13 +448,14 @@ async def _push_logs_in_chunks(self, job: Job): async def start(self): """Start the execution in a loop until terminated.""" try: - await worker_register( + register_result = await worker_register( self.hostname, EdgeWorkerState.STARTING, self.queues, await self._get_sysinfo(), self.team_name, ) + self.versions_match = register_result.versions_match except EdgeWorkerVersionException as e: logger.info("Version mismatch of Edge worker and Core. Shutting down worker.") raise SystemExit(str(e)) @@ -590,6 +608,7 @@ async def heartbeat(self, new_maintenance_comments: str | None = None) -> bool: new_maintenance_comments, team_name=self.team_name, ) + self.versions_match = worker_info.versions_match self.queues = worker_info.queues if worker_info.concurrency is not None and worker_info.concurrency != self.concurrency: logger.info( diff --git a/providers/edge3/src/airflow/providers/edge3/get_provider_info.py b/providers/edge3/src/airflow/providers/edge3/get_provider_info.py index 7e0085982b286..53ce9805dedc3 100644 --- a/providers/edge3/src/airflow/providers/edge3/get_provider_info.py +++ b/providers/edge3/src/airflow/providers/edge3/get_provider_info.py @@ -123,6 +123,20 @@ def get_provider_info(): "default": None, "example": None, }, + "minimum_acceptable_core_version_for_workers": { + "description": "The minimum acceptable version of the central Airflow site and edge worker to work with.\n\nThis is used to make sure that the central Airflow site and edge workers are compatible with each\nother. When an edge worker connects to the central site, it reports its installed airflow\ncore version. If the version is lower than the minimum acceptable version, the central site will\nnotify the worker during heartbeat and raise an alert. The worker will gracefully terminate and\ndrain jobs and shut down.\n\nIf you know your workers are still compatible with older versions of the central site, you can set\nthis to a lower version to avoid having to update all workers at the same time as the central site.\nDefault is to force matching versions, so relaxing is optional to be configured.\n\nIf the airflow core version between edge worker and central site is not matchning, the worker will\nreport warning status.\n", + "version_added": "3.6.0", + "type": "string", + "default": None, + "example": "3.2.1", + }, + "minimum_acceptable_edge_version_for_workers": { + "description": "The minimum acceptable version of the edge worker to work with the central site.\n\nThis is used to make sure that the central Airflow site and edge workers are compatible with each\nother. When an edge worker connects to the central site, it reports its edge version. If the\nversion is lower than the minimum acceptable version, the central site will notify the worker\nduring heartbeat and raise an alert. The worker will gracefully terminate and drain jobs and shut\ndown.\n\nIf you know your edge workers are still compatible with older versions of the central site, you\ncan set this to a lower version to avoid having to update all workers at the same time as the\ncentral site. Default is to force matching versions, so relaxing is optional to be configured.\n\nIf the edge version between edge worker and central site is not matchning, the worker will report\nwarning status.\n", + "version_added": "3.6.0", + "type": "string", + "default": None, + "example": "3.5.0", + }, "extended_system_info_function": { "description": "The function to call to get extended system information for the worker.\n\nThe function must be async and return a ``dict[str, str | int | float | datetime]``.\nThe information will be sent to the central site with each heartbeat and can be used for monitoring\nand debugging purposes. All int and float values will also be published to metric collection systems\nlike statsd or otel.\n\nFunction must be provided as a string with the full path to the function. See\nhttps://github.com/apache/airflow/blob/main/providers/edge3/src/airflow/providers/edge3/cli/example_extended_sysinfo.py\nfor an example implementation.\n", "version_added": "3.5.0", diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/datamodels.py b/providers/edge3/src/airflow/providers/edge3/worker_api/datamodels.py index aefc1af6119a9..fb947ee500329 100644 --- a/providers/edge3/src/airflow/providers/edge3/worker_api/datamodels.py +++ b/providers/edge3/src/airflow/providers/edge3/worker_api/datamodels.py @@ -192,6 +192,14 @@ class WorkerRegistrationReturn(BaseModel): """The return class for the worker registration.""" last_update: Annotated[datetime, Field(description="Time of the last update of the worker.")] + versions_match: Annotated[ + bool, + Field( + description="Whether the worker and the server have matching versions of Airflow and the Edge Provider. " + "If False, the worker version is not matching and might need to be upgraded. But version is still " + "compatible enough to work. If True, worker and server versions match.", + ), + ] = False # If not explicitly given assume it is not compatible class WorkerSetStateReturn(BaseModel): @@ -215,3 +223,11 @@ class WorkerSetStateReturn(BaseModel): "None means no remote override; the worker uses its startup value.", ), ] = None + versions_match: Annotated[ + bool, + Field( + description="Whether the worker and the server have matching versions of Airflow and the Edge Provider. " + "If False, the worker version is not matching and might need to be upgraded. But version is still " + "compatible enough to work. If True, worker and server versions match.", + ), + ] = False # If not explicitly given assume it is not compatible diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py index 4132be87c44a0..fca49c987f5c1 100644 --- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py +++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py @@ -21,12 +21,15 @@ from typing import Annotated from fastapi import Body, Depends, HTTPException, Path, status +from packaging.version import Version from sqlalchemy import select +from airflow import __version__ as airflow_version from airflow.api_fastapi.common.db.common import SessionDep # noqa: TC001 from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc -from airflow.providers.common.compat.sdk import Stats, timezone +from airflow.providers.common.compat.sdk import Stats, conf, timezone +from airflow.providers.edge3 import __version__ as edge_provider_version from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, EdgeWorkerState, set_metrics from airflow.providers.edge3.worker_api.auth import jwt_token_authorization_rest from airflow.providers.edge3.worker_api.datamodels import ( @@ -49,38 +52,55 @@ ) -def _assert_version(sysinfo: dict[str, str | int | float | datetime]) -> None: - """Check if the Edge Worker version matches the central API site.""" - from airflow import __version__ as airflow_version - from airflow.providers.edge3 import __version__ as edge_provider_version +def _version(version_str: str) -> tuple[int, int, int]: + """Convert a version string into a tuple of integers for comparison.""" + version = Version(version_str) + return version.major, version.minor, version.micro + - # Note: In future, more stable versions we might be more liberate, for the - # moment we require exact version match for Edge Worker and core version +def _assert_version(sysinfo: dict[str, str | int | float | datetime]) -> bool: + """Check if the Edge Worker version matches the central API site.""" + versions_match = True if "airflow_version" in sysinfo: - airflow_on_worker = sysinfo["airflow_version"] + airflow_on_worker: str = sysinfo["airflow_version"] # type: ignore if airflow_on_worker != airflow_version: - raise HTTPException( - status.HTTP_400_BAD_REQUEST, - f"Edge Worker runs on Airflow {airflow_on_worker} " - f"and the core runs on {airflow_version}. Rejecting access due to difference.", + versions_match = False + minimum_acceptable_core_version_for_workers = conf.get( + "edge", "minimum_acceptable_core_version_for_workers", fallback=None ) + if not minimum_acceptable_core_version_for_workers or _version(airflow_on_worker) < _version( + minimum_acceptable_core_version_for_workers + ): + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + f"Edge Worker runs on Airflow {airflow_on_worker} which is below the minimum acceptable version " + f"{minimum_acceptable_core_version_for_workers}. Rejecting access due to incompatible version.", + ) else: raise HTTPException( status.HTTP_400_BAD_REQUEST, "Edge Worker does not specify the version it is running on." ) if "edge_provider_version" in sysinfo: - provider_on_worker = sysinfo["edge_provider_version"] + provider_on_worker: str = sysinfo["edge_provider_version"] # type: ignore if provider_on_worker != edge_provider_version: - raise HTTPException( - status.HTTP_400_BAD_REQUEST, - f"Edge Worker runs on Edge Provider {provider_on_worker} " - f"and the core runs on {edge_provider_version}. Rejecting access due to difference.", + versions_match = False + minimum_acceptable_edge_version_for_workers = conf.get( + "edge", "minimum_acceptable_edge_version_for_workers", fallback=None ) + if not minimum_acceptable_edge_version_for_workers or _version(provider_on_worker) < _version( + minimum_acceptable_edge_version_for_workers + ): + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + f"Edge Worker runs on Edge Provider {provider_on_worker} " + f"and the core runs on {edge_provider_version}. Rejecting access due to difference.", + ) else: raise HTTPException( status.HTTP_400_BAD_REQUEST, "Edge Worker does not specify the provider version it is running on." ) + return versions_match _worker_name_doc = Path(title="Worker Name", description="Hostname or instance name of the worker") @@ -164,7 +184,7 @@ def register( session: SessionDep, ) -> WorkerRegistrationReturn: """Register a new worker to the backend.""" - _assert_version(body.sysinfo) + versions_match = _assert_version(body.sysinfo) query = select(EdgeWorkerModel).where(EdgeWorkerModel.worker_name == worker_name) worker: EdgeWorkerModel | None = session.scalar(query) if not worker: @@ -193,7 +213,7 @@ def register( worker.last_update = timezone.utcnow() worker.team_name = body.team_name session.add(worker) - return WorkerRegistrationReturn(last_update=worker.last_update) + return WorkerRegistrationReturn(last_update=worker.last_update, versions_match=versions_match) @worker_router.patch("/{worker_name}", dependencies=[Depends(jwt_token_authorization_rest)]) @@ -227,12 +247,13 @@ def set_state( queues=worker.queues, sysinfo=body.sysinfo, ) - _assert_version(body.sysinfo) # Exception only after worker state is in the DB + versions_match = _assert_version(body.sysinfo) # Exception only after worker state is in the DB return WorkerSetStateReturn( state=worker.state, queues=worker.queues, maintenance_comments=worker.maintenance_comment, concurrency=worker.concurrency, + versions_match=versions_match, ) diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml b/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml index b9388b6b6c9d6..01c8149d1dad8 100644 --- a/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml +++ b/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml @@ -1453,6 +1453,14 @@ components: format: date-time title: Last Update description: Time of the last update of the worker. + versions_match: + type: boolean + title: Versions Match + description: Whether the worker and the server have matching versions of + Airflow and the Edge Provider. If False, the worker version is not matching + and might need to be upgraded. But version is still compatible enough + to work. If True, worker and server versions match. + default: false type: object required: - last_update @@ -1485,6 +1493,14 @@ components: title: Concurrency description: Desired concurrency for the worker set by an administrator. None means no remote override; the worker uses its startup value. + versions_match: + type: boolean + title: Versions Match + description: Whether the worker and the server have matching versions of + Airflow and the Edge Provider. If False, the worker version is not matching + and might need to be upgraded. But version is still compatible enough + to work. If True, worker and server versions match. + default: false type: object required: - state diff --git a/providers/edge3/tests/unit/edge3/cli/test_worker.py b/providers/edge3/tests/unit/edge3/cli/test_worker.py index 933793a4f068d..ced124285023e 100644 --- a/providers/edge3/tests/unit/edge3/cli/test_worker.py +++ b/providers/edge3/tests/unit/edge3/cli/test_worker.py @@ -20,6 +20,7 @@ import contextlib import importlib import json +import logging import multiprocessing import signal from datetime import datetime @@ -683,10 +684,23 @@ async def test_get_sysinfo(self, worker_with_job: EdgeWorker): assert "concurrency" in sysinfo assert "worker_start_time" in sysinfo assert sysinfo["worker_start_time"] == worker_with_job.worker_start_time - assert "status" in sysinfo + assert sysinfo["status"] == logging.INFO assert "status_text" not in sysinfo # is only defined if extended sysinfo provides this field assert sysinfo["concurrency"] == concurrency + @pytest.mark.asyncio + async def test_get_sysinfo_version_mismatch(self, worker_with_job: EdgeWorker): + worker_with_job.versions_match = False # Simulate version mismatch to verify sysinfo + sysinfo = await worker_with_job._get_sysinfo() + assert "airflow_version" in sysinfo + assert "edge_provider_version" in sysinfo + assert "python_version" in sysinfo + assert "concurrency" in sysinfo + assert "worker_start_time" in sysinfo + assert sysinfo["worker_start_time"] == worker_with_job.worker_start_time + assert sysinfo["status"] == logging.WARNING + assert "status_text" in sysinfo + @pytest.mark.asyncio async def test_get_sysinfo_extended(self, worker_with_job_and_sysinfo: EdgeWorker): concurrency = 42 diff --git a/providers/edge3/tests/unit/edge3/worker_api/routes/test_worker.py b/providers/edge3/tests/unit/edge3/worker_api/routes/test_worker.py index fef1c9d50256a..099ba7e882739 100644 --- a/providers/edge3/tests/unit/edge3/worker_api/routes/test_worker.py +++ b/providers/edge3/tests/unit/edge3/worker_api/routes/test_worker.py @@ -37,11 +37,14 @@ from airflow.providers.edge3.worker_api.datamodels import WorkerQueueUpdateBody, WorkerStateBody from airflow.providers.edge3.worker_api.routes.worker import ( _assert_version, + _version, register, set_state, update_queues, ) +from tests_common.test_utils.config import conf_vars + if TYPE_CHECKING: from sqlalchemy.orm import Session @@ -68,6 +71,11 @@ def cli_worker(self, tmp_path: Path) -> EdgeWorker: def setup_test_cases(self, session: Session): session.execute(delete(EdgeWorkerModel)) + def test_version(self): + assert _version("1.2.3") == (1, 2, 3) + assert _version("1.2.3rc1") == (1, 2, 3) + assert _version("1.2.3.dev0") == (1, 2, 3) + def test_assert_version(self): from airflow import __version__ as airflow_version from airflow.providers.edge3 import __version__ as edge_provider_version @@ -87,6 +95,22 @@ def test_assert_version(self): with pytest.raises(HTTPException): _assert_version({"airflow_version": airflow_version, "edge_provider_version": "2023.10.07"}) + with conf_vars({("edge", "minimum_acceptable_edge_version_for_workers"): "3.2.0"}): + with pytest.raises(HTTPException): + _assert_version({"airflow_version": airflow_version, "edge_provider_version": "3.1.0"}) + + _assert_version({"airflow_version": airflow_version, "edge_provider_version": "3.2.0"}) + _assert_version({"airflow_version": airflow_version, "edge_provider_version": "3.2.1rc1"}) + _assert_version({"airflow_version": airflow_version, "edge_provider_version": "4.1.1"}) + + with conf_vars({("edge", "minimum_acceptable_core_version_for_workers"): "3.1.0"}): + with pytest.raises(HTTPException): + _assert_version({"airflow_version": "3.0.0", "edge_provider_version": edge_provider_version}) + + _assert_version({"airflow_version": "3.1.0", "edge_provider_version": edge_provider_version}) + _assert_version({"airflow_version": "3.2.0rc3", "edge_provider_version": edge_provider_version}) + _assert_version({"airflow_version": "4.1.0", "edge_provider_version": edge_provider_version}) + _assert_version({"airflow_version": airflow_version, "edge_provider_version": edge_provider_version}) @pytest.mark.parametrize(