Skip to content

Commit

Permalink
Deferrable support for HttpOperator (#45228)
Browse files Browse the repository at this point in the history
* Corrected the relationship between session and response appropriately.

* made HttpMethodException

* Update providers/src/airflow/providers/http/hooks/http.py

Co-authored-by: Wei Lee <[email protected]>

* Update providers/src/airflow/providers/http/hooks/http.py

Co-authored-by: Wei Lee <[email protected]>

* fix for review

* fix for pre-commit

---------

Co-authored-by: Wei Lee <[email protected]>
  • Loading branch information
TakayukiTanabeSS and Lee-W authored Jan 17, 2025
1 parent 5797edc commit f3666e7
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 116 deletions.
27 changes: 27 additions & 0 deletions providers/src/airflow/providers/http/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from airflow.exceptions import AirflowException


class HttpErrorException(AirflowException):
"""Exception raised for HTTP error in Http hook."""


class HttpMethodException(AirflowException):
"""Exception raised for invalid HTTP methods in Http hook."""
90 changes: 44 additions & 46 deletions providers/src/airflow/providers/http/hooks/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
# under the License.
from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING, Any, Callable
from urllib.parse import urlparse

Expand All @@ -32,6 +31,7 @@

from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from airflow.providers.http.exceptions import HttpErrorException, HttpMethodException

if TYPE_CHECKING:
from aiohttp.client_reqrep import ClientResponse
Expand Down Expand Up @@ -359,6 +359,7 @@ def __init__(

async def run(
self,
session: aiohttp.ClientSession,
endpoint: str | None = None,
data: dict[str, Any] | str | None = None,
json: dict[str, Any] | str | None = None,
Expand Down Expand Up @@ -410,54 +411,51 @@ async def run(

url = _url_from_endpoint(self.base_url, endpoint)

async with aiohttp.ClientSession() as session:
if self.method == "GET":
request_func = session.get
elif self.method == "POST":
request_func = session.post
elif self.method == "PATCH":
request_func = session.patch
elif self.method == "HEAD":
request_func = session.head
elif self.method == "PUT":
request_func = session.put
elif self.method == "DELETE":
request_func = session.delete
elif self.method == "OPTIONS":
request_func = session.options
else:
raise AirflowException(f"Unexpected HTTP Method: {self.method}")

for attempt in range(1, 1 + self.retry_limit):
response = await request_func(
if self.method == "GET":
request_func = session.get
elif self.method == "POST":
request_func = session.post
elif self.method == "PATCH":
request_func = session.patch
elif self.method == "HEAD":
request_func = session.head
elif self.method == "PUT":
request_func = session.put
elif self.method == "DELETE":
request_func = session.delete
elif self.method == "OPTIONS":
request_func = session.options
else:
raise HttpMethodException(f"Unexpected HTTP Method: {self.method}")

for attempt in range(1, 1 + self.retry_limit):
response = await request_func(
url,
params=data if self.method == "GET" else None,
data=data if self.method in ("POST", "PUT", "PATCH") else None,
json=json,
headers=_headers,
auth=auth,
**extra_options,
)
try:
response.raise_for_status()
except ClientResponseError as e:
self.log.warning(
"[Try %d of %d] Request to %s failed.",
attempt,
self.retry_limit,
url,
params=data if self.method == "GET" else None,
data=data if self.method in ("POST", "PUT", "PATCH") else None,
json=json,
headers=_headers,
auth=auth,
**extra_options,
)
try:
response.raise_for_status()
except ClientResponseError as e:
self.log.warning(
"[Try %d of %d] Request to %s failed.",
attempt,
self.retry_limit,
url,
)
if not self._retryable_error_async(e) or attempt == self.retry_limit:
self.log.exception("HTTP error with status: %s", e.status)
# In this case, the user probably made a mistake.
# Don't retry.
raise AirflowException(f"{e.status}:{e.message}")
else:
await asyncio.sleep(self.retry_delay)
else:
return response
if not self._retryable_error_async(e) or attempt == self.retry_limit:
self.log.exception("HTTP error with status: %s", e.status)
# In this case, the user probably made a mistake.
# Don't retry.
raise HttpErrorException(f"{e.status}:{e.message}")
else:
raise NotImplementedError # should not reach this, but makes mypy happy
return response

raise NotImplementedError # should not reach this, but makes mypy happy

@classmethod
def _process_extra_options_from_connection(cls, conn: Connection, extra_options: dict) -> dict:
Expand Down
31 changes: 18 additions & 13 deletions providers/src/airflow/providers/http/triggers/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from collections.abc import AsyncIterator
from typing import TYPE_CHECKING, Any

import aiohttp
import requests
from requests.cookies import RequestsCookieJar
from requests.structures import CaseInsensitiveDict
Expand Down Expand Up @@ -94,13 +95,15 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
auth_type=self.auth_type,
)
try:
client_response = await hook.run(
endpoint=self.endpoint,
data=self.data,
headers=self.headers,
extra_options=self.extra_options,
)
response = await self._convert_response(client_response)
async with aiohttp.ClientSession() as session:
client_response = await hook.run(
session=session,
endpoint=self.endpoint,
data=self.data,
headers=self.headers,
extra_options=self.extra_options,
)
response = await self._convert_response(client_response)
yield TriggerEvent(
{
"status": "success",
Expand Down Expand Up @@ -181,12 +184,14 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
hook = self._get_async_hook()
while True:
try:
await hook.run(
endpoint=self.endpoint,
data=self.data,
headers=self.headers,
extra_options=self.extra_options,
)
async with aiohttp.ClientSession() as session:
await hook.run(
session=session,
endpoint=self.endpoint,
data=self.data,
headers=self.headers,
extra_options=self.extra_options,
)
yield TriggerEvent(True)
return
except AirflowException as exc:
Expand Down
Loading

0 comments on commit f3666e7

Please sign in to comment.