diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 96a1b420e3522..79fb88a8a2b61 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -528,7 +528,8 @@ "edge": { "deps": [ "apache-airflow>=2.10.0", - "pydantic>=2.10.2" + "pydantic>=2.10.2", + "retryhttp>=1.2.0" ], "devel-deps": [], "plugins": [ diff --git a/providers/src/airflow/providers/edge/cli/api_client.py b/providers/src/airflow/providers/edge/cli/api_client.py index 50f5171615de8..174e9d9216bfd 100644 --- a/providers/src/airflow/providers/edge/cli/api_client.py +++ b/providers/src/airflow/providers/edge/cli/api_client.py @@ -26,9 +26,8 @@ from urllib.parse import quote, urljoin import requests -import tenacity -from requests.exceptions import ConnectionError -from urllib3.exceptions import NewConnectionError +from retryhttp import retry, wait_retry_after +from tenacity import before_log, wait_random_exponential from airflow.configuration import conf from airflow.exceptions import AirflowException @@ -50,36 +49,28 @@ # Hidden config options for Edge Worker how retries on HTTP requests should be handled # Note: Given defaults make attempts after 1, 3, 7, 15, 31seconds, 1:03, 2:07, 3:37 and fails after 5:07min -AIRFLOW__EDGE__API_RETRIES = int(os.getenv("AIRFLOW__EDGE__API_RETRIES", 10)) -AIRFLOW__EDGE__API_RETRY_WAIT_MIN = int(os.getenv("AIRFLOW__EDGE__API_RETRY_WAIT_MIN", 1)) -AIRFLOW__EDGE__API_RETRY_WAIT_MAX = int(os.getenv("AIRFLOW__EDGE__API_RETRY_WAIT_MAX", 90)) - - -def _is_retryable_exception(exception: BaseException) -> bool: - """ - Evaluate which exception types to retry. - - This is especially demanded for cases where an application gateway or Kubernetes ingress can - not find a running instance of a webserver hosting the API (HTTP 502+504) or when the - HTTP request fails in general on network level. - - Note that we want to fail on other general errors on the webserver not to send bad requests in an endless loop. - """ - retryable_status_codes = (HTTPStatus.BAD_GATEWAY, HTTPStatus.GATEWAY_TIMEOUT) - return ( - isinstance(exception, AirflowException) - and exception.status_code in retryable_status_codes - or isinstance(exception, (ConnectionError, NewConnectionError)) - ) +# So far there is no other config facility in Task SDK we use ENV for the moment +# TODO: Consider these env variables jointly in task sdk together with task_sdk/src/airflow/sdk/api/client.py +API_RETRIES = int(os.getenv("AIRFLOW__EDGE__API_RETRIES", os.getenv("AIRFLOW__WORKERS__API_RETRIES", 10))) +API_RETRY_WAIT_MIN = float( + os.getenv("AIRFLOW__EDGE__API_RETRY_WAIT_MIN", os.getenv("AIRFLOW__WORKERS__API_RETRY_WAIT_MIN", 1.0)) +) +API_RETRY_WAIT_MAX = float( + os.getenv("AIRFLOW__EDGE__API_RETRY_WAIT_MAX", os.getenv("AIRFLOW__WORKERS__API_RETRY_WAIT_MAX", 90.0)) +) + + +_default_wait = wait_random_exponential(min=API_RETRY_WAIT_MIN, max=API_RETRY_WAIT_MAX) -@tenacity.retry( - stop=tenacity.stop_after_attempt(AIRFLOW__EDGE__API_RETRIES), - wait=tenacity.wait_exponential( - min=AIRFLOW__EDGE__API_RETRY_WAIT_MIN, max=AIRFLOW__EDGE__API_RETRY_WAIT_MAX - ), - retry=tenacity.retry_if_exception(_is_retryable_exception), - before_sleep=tenacity.before_log(logger, logging.WARNING), +@retry( + reraise=True, + max_attempt_number=API_RETRIES, + wait_server_errors=_default_wait, + wait_network_errors=_default_wait, + wait_timeouts=_default_wait, + wait_rate_limited=wait_retry_after(fallback=_default_wait), # No infinite timeout on HTTP 429 + before_sleep=before_log(logger, logging.WARNING), ) def _make_generic_request(method: str, rest_path: str, data: str | None = None) -> Any: signer = jwt_signer() diff --git a/providers/src/airflow/providers/edge/provider.yaml b/providers/src/airflow/providers/edge/provider.yaml index aa48ce77d5c35..6628aceab5786 100644 --- a/providers/src/airflow/providers/edge/provider.yaml +++ b/providers/src/airflow/providers/edge/provider.yaml @@ -32,6 +32,7 @@ versions: dependencies: - apache-airflow>=2.10.0 - pydantic>=2.10.2 + - retryhttp>=1.2.0 plugins: - name: edge_executor