Skip to content

Commit

Permalink
[Edge] Fix edge worker api support none default base api url (#44732)
Browse files Browse the repository at this point in the history
* Fix method compare of api call

* Removed first slash from prue_method name

* Update providers/src/airflow/providers/edge/CHANGELOG.rst

* Update providers/src/airflow/providers/edge/worker_api/auth.py

Co-authored-by: Jens Scheffler <[email protected]>

---------

Co-authored-by: Marco Küttelwesch <[email protected]>
Co-authored-by: Jens Scheffler <[email protected]>
  • Loading branch information
3 people authored Dec 9, 2024
1 parent 07bacd5 commit db14b8c
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 7 deletions.
8 changes: 8 additions & 0 deletions providers/src/airflow/providers/edge/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@
Changelog
---------

0.9.2pre0
.........

Misc
~~~~

* ``Fix check edge worker api call authentication with different base url. Authentication failed when Airflow is not installed in webserver root.``

0.9.1pre0
.........

Expand Down
2 changes: 1 addition & 1 deletion providers/src/airflow/providers/edge/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

__all__ = ["__version__"]

__version__ = "0.9.1pre0"
__version__ = "0.9.2pre0"

if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
"2.10.0"
Expand Down
5 changes: 2 additions & 3 deletions providers/src/airflow/providers/edge/cli/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from http import HTTPStatus
from pathlib import Path
from typing import TYPE_CHECKING, Any
from urllib.parse import quote, urljoin, urlparse
from urllib.parse import quote, urljoin

import requests
import tenacity
Expand Down Expand Up @@ -74,11 +74,10 @@ def _is_retryable_exception(exception: BaseException) -> bool:
def _make_generic_request(method: str, rest_path: str, data: str | None = None) -> Any:
signer = jwt_signer()
api_url = conf.get("edge", "api_url")
path = urlparse(api_url).path.replace("/rpcapi", "")
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": signer.generate_signed_token({"method": str(Path(path, rest_path))}),
"Authorization": signer.generate_signed_token({"method": rest_path}),
}
api_endpoint = urljoin(api_url, rest_path)
response = requests.request(method, url=api_endpoint, data=data, headers=headers)
Expand Down
2 changes: 1 addition & 1 deletion providers/src/airflow/providers/edge/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ source-date-epoch: 1729683247

# note that those versions are maintained by release manager - do not update them manually
versions:
- 0.9.1pre0
- 0.9.2pre0

dependencies:
- apache-airflow>=2.10.0
Expand Down
9 changes: 7 additions & 2 deletions providers/src/airflow/providers/edge/worker_api/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,18 @@ def _forbidden_response(message: str):
def jwt_token_authorization(method: str, authorization: str):
"""Check if the JWT token is correct."""
try:
# worker sends method without api_url
api_url = conf.get("edge", "api_url")
base_url = conf.get("webserver", "base_url")
url_prefix = api_url.replace(base_url, "").replace("/rpcapi", "/")
pure_method = method.replace(url_prefix, "")
payload = jwt_signer().verify_token(authorization)
signed_method = payload.get("method")
if not signed_method or signed_method != method:
if not signed_method or signed_method != pure_method:
_forbidden_response(
"Invalid method in token authorization. "
f"signed method='{signed_method}' "
f"called method='{method}'",
f"called method='{pure_method}'",
)
except BadSignature:
_forbidden_response("Bad Signature. Please use only the tokens provided by the API.")
Expand Down

0 comments on commit db14b8c

Please sign in to comment.