From 99d53536e4ff2ae49c16dcfd228748d5a298ad5c Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sun, 1 Dec 2024 16:42:11 +0100 Subject: [PATCH] Make Edge API retries configurable --- providers/src/airflow/providers/edge/CHANGELOG.rst | 8 ++++++++ providers/src/airflow/providers/edge/__init__.py | 2 +- .../src/airflow/providers/edge/cli/api_client.py | 14 ++++++++++++-- providers/src/airflow/providers/edge/provider.yaml | 2 +- 4 files changed, 22 insertions(+), 4 deletions(-) diff --git a/providers/src/airflow/providers/edge/CHANGELOG.rst b/providers/src/airflow/providers/edge/CHANGELOG.rst index 9db03fbedf327..39e64bd906134 100644 --- a/providers/src/airflow/providers/edge/CHANGELOG.rst +++ b/providers/src/airflow/providers/edge/CHANGELOG.rst @@ -27,6 +27,14 @@ Changelog --------- +0.9.3pre0 +......... + +Misc +~~~~ + +* ``Make API retries configurable via ENV. Connection loss is sustained for 5min by default.`` + 0.9.2pre0 ......... diff --git a/providers/src/airflow/providers/edge/__init__.py b/providers/src/airflow/providers/edge/__init__.py index 4d6f15fb48da3..747a495dac137 100644 --- a/providers/src/airflow/providers/edge/__init__.py +++ b/providers/src/airflow/providers/edge/__init__.py @@ -29,7 +29,7 @@ __all__ = ["__version__"] -__version__ = "0.9.2pre0" +__version__ = "0.9.3pre0" if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse( "2.10.0" diff --git a/providers/src/airflow/providers/edge/cli/api_client.py b/providers/src/airflow/providers/edge/cli/api_client.py index 483c5ab3759e5..50f5171615de8 100644 --- a/providers/src/airflow/providers/edge/cli/api_client.py +++ b/providers/src/airflow/providers/edge/cli/api_client.py @@ -18,6 +18,7 @@ import json import logging +import os from datetime import datetime from http import HTTPStatus from pathlib import Path @@ -47,6 +48,13 @@ logger = logging.getLogger(__name__) +# Hidden config options for Edge Worker how retries on HTTP requests should be handled +# Note: Given defaults make attempts after 1, 3, 7, 15, 31seconds, 1:03, 2:07, 3:37 and fails after 5:07min +AIRFLOW__EDGE__API_RETRIES = int(os.getenv("AIRFLOW__EDGE__API_RETRIES", 10)) +AIRFLOW__EDGE__API_RETRY_WAIT_MIN = int(os.getenv("AIRFLOW__EDGE__API_RETRY_WAIT_MIN", 1)) +AIRFLOW__EDGE__API_RETRY_WAIT_MAX = int(os.getenv("AIRFLOW__EDGE__API_RETRY_WAIT_MAX", 90)) + + def _is_retryable_exception(exception: BaseException) -> bool: """ Evaluate which exception types to retry. @@ -66,8 +74,10 @@ def _is_retryable_exception(exception: BaseException) -> bool: @tenacity.retry( - stop=tenacity.stop_after_attempt(10), # TODO: Make this configurable - wait=tenacity.wait_exponential(min=1), # TODO: Make this configurable + stop=tenacity.stop_after_attempt(AIRFLOW__EDGE__API_RETRIES), + wait=tenacity.wait_exponential( + min=AIRFLOW__EDGE__API_RETRY_WAIT_MIN, max=AIRFLOW__EDGE__API_RETRY_WAIT_MAX + ), retry=tenacity.retry_if_exception(_is_retryable_exception), before_sleep=tenacity.before_log(logger, logging.WARNING), ) diff --git a/providers/src/airflow/providers/edge/provider.yaml b/providers/src/airflow/providers/edge/provider.yaml index 4ce807be94640..5c654facaf27b 100644 --- a/providers/src/airflow/providers/edge/provider.yaml +++ b/providers/src/airflow/providers/edge/provider.yaml @@ -27,7 +27,7 @@ source-date-epoch: 1729683247 # note that those versions are maintained by release manager - do not update them manually versions: - - 0.9.2pre0 + - 0.9.3pre0 dependencies: - apache-airflow>=2.10.0