Skip to content
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
66d464b
fix: move parse_keycloak_from_openid_url to common/auth package to reuse
botanical Feb 12, 2026
a17407f
fix: linting and generalize regexes to use in pep middleware
botanical Feb 13, 2026
921095c
feat: create policy enforcement point for stac api
botanical Feb 13, 2026
2d96128
feat: add pep middleware to stac api
botanical Feb 13, 2026
9d08bef
fix: import common auth in dockerfile
botanical Feb 13, 2026
9e5a724
fix: attempt to resolve dependencies
botanical Feb 13, 2026
0629819
fix: add debugging
botanical Feb 13, 2026
283f9ab
fix: env vars for lambda
botanical Feb 13, 2026
605a881
fix: update to use keycloak secret instead
botanical Feb 13, 2026
bb514b4
fix: add prefix to keycloak secret env var
botanical Feb 13, 2026
e4c4a9d
fix: update error message, and also update keycloak client to check r…
botanical Feb 13, 2026
b0a0342
fix: catch invalid token case and throw helpful error
botanical Feb 13, 2026
38bc05f
fix: move pep middleware to common auth so it's reusable
botanical Feb 14, 2026
c64d035
fix: add pep middleware to ingest api, delete old middleware file in …
botanical Feb 14, 2026
bec19a6
fix: formatting
botanical Feb 14, 2026
3fa37e1
feat: add integration tests for proof of concept
botanical Feb 17, 2026
cd79d46
fix: attempt to fix mutable mapping incompatible type issue
botanical Feb 17, 2026
266818c
fix: attempt to fix tests, and enable transactions on pep int tests
botanical Feb 18, 2026
480d5bb
fix: add logging to conftest to debug
botanical Feb 18, 2026
e7c004f
fix: linting
botanical Feb 18, 2026
aaf029a
fix: clear cache and reload config and app to properly register endpoint
botanical Feb 18, 2026
e4f0113
fix: add root path to pep int tests
botanical Feb 18, 2026
e360ced
fix: add back arg comment
botanical Feb 18, 2026
a6e4f60
Update common/auth/veda_auth/keycloak_client.py
botanical Feb 19, 2026
5c0b8c1
fix: add path to error message, and add error message function for pe…
botanical Feb 19, 2026
b2380c2
fix: add case to handle nonexistent tenant
botanical Mar 2, 2026
9b033e0
fix: attempt to fix tests
botanical Mar 2, 2026
2176ff9
fix: create new error for permission denied
botanical Mar 6, 2026
601fdc1
Merge branch 'develop' into mt-uma/pep-poc
botanical Mar 6, 2026
31c8873
fix: remove return type on check_permission
botanical Mar 6, 2026
a8a9a55
fix: formatting, remove unused variable
botanical Mar 6, 2026
6721ff0
fix: update readme
botanical Mar 6, 2026
afdc19a
fix: update tests
botanical Mar 6, 2026
243a793
fix: spacing, move import
botanical Mar 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests/test_stac.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ def test_links_with_tenant(self, tenant, collections, endpoint, method):
url = f"{base}/{endpoint}"
url = url.replace("{collection_id}", collection_id)
url = url.replace("{item_id}", item_id)
resp = httpx.request(method, url)
resp = httpx.request(method, url, json={} if method == "POST" else None)
assert resp.status_code == 200
data = resp.json()

Expand Down
144 changes: 111 additions & 33 deletions common/auth/veda_auth/keycloak_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,63 @@
import base64
import json
import logging
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlencode
from typing import Any, Dict, List, Optional, Tuple, Union
from urllib.parse import urlencode, urlparse

import httpx

logger = logging.getLogger(__name__)


class TokenError(Exception):
"""Raised when the access token is expired, revoked, or invalid.
Keycloak returns HTTP 401
"""

def __init__(self, detail: str = "Access token is expired or invalid"):
"""To use when there is a token error for RPT call"""
self.detail = detail
super().__init__(detail)


class ResourceNotFoundError(Exception):
"""Raised when Keycloak returns HTTP 400 with invalid_resource error.
This means the requested resource (tenant) does not exist in the
resource server
"""

def __init__(self, resource_id: str):
"""Initialize with the resource ID that was not found"""
self.resource_id = resource_id
super().__init__(f"Resource not found: {resource_id}")


def parse_keycloak_from_openid_url(
openid_configuration_url: Union[str, Any]
) -> Tuple[str, str]:
"""Extract Keycloak base URL and realm from an OpenID discovery URL such as https://<host>/realms/<realm>/.well-known/openid-configuration"""
if not openid_configuration_url:
raise ValueError("Missing or empty OpenID configuration URL")
url_str = str(openid_configuration_url).strip()

parsed = urlparse(url_str)
path = (parsed.path or "").rstrip("/")

if "/realms/" not in path:
raise ValueError(
"OpenID configuration URL must contain /realms/<realm>/ "
"(e.g. .../realms/my-realm/.well-known/openid-configuration). "
f"Got path: {repr(path)}"
)

realm = path.split("/realms/")[-1].split("/")[0]
if not realm:
raise ValueError("Could not extract realm from OpenID configuration URL")

keycloak_url = f"{parsed.scheme}://{parsed.netloc}"
return keycloak_url, realm


def _add_base64_padding(payload: str) -> str:
"""Add padding to base64 string if needed

Expand Down Expand Up @@ -157,13 +206,69 @@ def _extract_permissions_from_jwt(self, jwt_token: str) -> List[Dict[str, Any]]:
logger.warning(f"Failed to extract permissions from JWT: {e}")
return []

def _resolve_permissions(
self, rpt_response: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""Extract permissions from an RPT response, and fall back to the JWT"""
permissions = rpt_response.get("permissions", [])
if not permissions:
rpt_jwt = rpt_response.get("access_token")
if rpt_jwt:
permissions = self._extract_permissions_from_jwt(rpt_jwt)
logger.debug(f"Extracted {len(permissions)} permissions from RPT JWT")
return permissions

def _has_matching_permission(
self,
permissions: List[Dict[str, Any]],
resource_id: str,
scope: str,
) -> bool:
"""Return True if permissions contain a grant for resource_id and scope

See https://www.keycloak.org/docs/latest/authorization_services/#_service_rpt_overview
"""
for permission in permissions:
rsname = permission.get("rsname") or permission.get("resource_id")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unclear on why rsid isn't relevant to check anymore

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's a good question! Basically we're not using rsid because the RPT puts the name in rsname (e.g. "stac:item:faketenant2:*") and the UUID in rsid (and sometimes in resource_id). You can check this yourself by requesting an RPT and introspecting it but basically it will show something like

{
            "scopes": [
                "read"
            ],
            "rsid": "some-uuid",
            "rsname": "stac:item:faketenant2:*",
            "resource_id": "some-uuid",
            "resource_scopes": [
                "read"
            ]
        }

We need the name to match and to parse type/tenant, so we use rsname (and resource_id when it’s the name). rsid only ever gives us the UUID which we don't use. I guess really we don't need resource_id but I had that as a fallback because I wasn't sure where keycloak would put the resource name.

if rsname == resource_id and scope in permission.get("scopes", []):
return True
return False

def _handle_rpt_http_error(
self, error: httpx.HTTPStatusError, resource_id: str
) -> bool:
"""Translate an HTTPStatusError from get_rpt

Returns False for a 403 (permission denied).
Raises TokenError for 401, ResourceNotFoundError for 400 invalid_resource.
Re-raises unhandled status codes.
"""
if error.response.status_code == 401:
logger.warning("Token rejected (401): %s", error.response.text)
raise TokenError(
"Access token is expired or invalid. Please re-authenticate."
) from error
if error.response.status_code == 403:
return False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about raising another error (PermissionDeniedError?) here and catching it in the middleware? I think the return False/raise Error pattern is a bit confusing, this way we could replace https://github.com/NASA-IMPACT/veda-backend/pull/569/changes#diff-4857746bfd90942dbb37adfab0448bb3a8c3683230e3fadf002e8e89b615445cR187-R202 with another except block

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your suggestion makes sense! The bool/error return is confusing, sorry about that.
2176ff9 What do you think of something like this?

if error.response.status_code == 400:
try:
error_body = error.response.json()
except Exception:
error_body = {}
if error_body.get("error") == "invalid_resource":
raise ResourceNotFoundError(resource_id=resource_id) from error
logger.error(
f"Permission check failed: {error.response.status_code} {error.response.text}"
)
raise error

def check_permission(
self,
access_token: str,
resource_id: str,
scope: str,
) -> bool:
"""Check if user has permission for a resource and scope
"""Check if user has permission for a resource and scope.

Args:
access_token: User's access token
Expand All @@ -183,37 +288,10 @@ def check_permission(
}
],
)

permissions = rpt_response.get("permissions", [])
if not permissions:
rpt_jwt = rpt_response.get("access_token")
if rpt_jwt:
permissions = self._extract_permissions_from_jwt(rpt_jwt)
logger.debug(
f"Extracted {len(permissions)} permissions from RPT JWT"
)

# https://www.keycloak.org/docs/latest/authorization_services/#_service_rpt_overview
for permission in permissions:
# Check rsid (RPT token format), resource_id (introspection format), or rsname (resource name)
resource_identifier = (
permission.get("rsid")
or permission.get("resource_id")
or permission.get("rsname")
)
if resource_identifier == resource_id:
scopes = permission.get("scopes", [])
if scope in scopes:
return True

return False
permissions = self._resolve_permissions(rpt_response)
return self._has_matching_permission(permissions, resource_id, scope)
except httpx.HTTPStatusError as e:
if e.response.status_code in (401, 403):
return False
logger.error(
f"Permission check failed: {e.response.status_code} {e.response.text}"
)
raise
return self._handle_rpt_http_error(e, resource_id)
except Exception as e:
logger.error(f"Unexpected error checking permission: {e}")
raise
Expand Down
204 changes: 204 additions & 0 deletions common/auth/veda_auth/pep_middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
"""Policy Enforcement Point (PEP) middleware"""

import logging
import re
from dataclasses import dataclass
from typing import Awaitable, Callable, Optional, Sequence

from veda_auth.keycloak_client import (
KeycloakPDPClient,
ResourceNotFoundError,
TokenError,
)
from veda_auth.resource_extractors import COLLECTIONS_CREATE_PATH_RE

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse, Response
from starlette.types import ASGIApp

logger = logging.getLogger(__name__)


@dataclass(frozen=True)
class ProtectedRoute:
"""Routes protected by policy decision point

path_re: regex pattern applied to request path
method: HTTP method
scope: Keycloak resource scope name to check (e.g. "create", "update", "delete")
"""

path_re: str
method: str
scope: str


DEFAULT_PROTECTED_ROUTES: Sequence[ProtectedRoute] = (
ProtectedRoute(path_re=COLLECTIONS_CREATE_PATH_RE, method="POST", scope="create"),
)


def pep_error_response(
status_code: int,
detail: str,
headers: Optional[dict] = None,
) -> JSONResponse:
"""Abstracted error response function"""
return JSONResponse(
status_code=status_code,
content={"detail": detail},
headers=headers or {},
)


class PEPMiddleware(BaseHTTPMiddleware):
"""Middleware that enforces UMA authorization"""

def __init__(
self,
app: ASGIApp,
*,
pdp_client: Callable[[], KeycloakPDPClient],
resource_extractor: Callable[[Request], Awaitable[Optional[str]]],
protected_routes: Optional[Sequence[ProtectedRoute]] = None,
):
"""Configure PEP middleware with a PDP client, resource extractor, and protected routes."""
super().__init__(app)
self._get_pdp_client = pdp_client
self._extract_resource_id = resource_extractor
routes = (
protected_routes
if protected_routes is not None
else DEFAULT_PROTECTED_ROUTES
)
self._compiled = [
(re.compile(r.path_re), r.method.upper(), r.scope) for r in routes
]

def _get_matching_scope_and_route(
self, request: Request
) -> Optional[tuple[str, str]]:
"""Return (scope, method) for the route that matches, otherwise return None"""
path = request.url.path.rstrip("/") or "/"
method = request.method.upper()
for pattern, route_method, scope in self._compiled:
if route_method == method and pattern.search(path):
return (scope, route_method)
return None

def _get_bearer_token(self, request: Request) -> Optional[str]:
"""Extract the Bearer token from the Authorization header"""
auth = request.headers.get("Authorization")
if not auth or not auth.startswith("Bearer "):
return None
return auth[7:].strip()

async def dispatch(self, request: Request, call_next: Callable) -> Response:
"""Check UMA authorization for protected routes, pass through otherwise."""
matched_request = self._get_matching_scope_and_route(request)
if matched_request is None:
logger.debug(
"PEP: no protected route match for %s %s... continuing",
request.method,
request.url.path,
)
return await call_next(request)

scope, _method = matched_request
logger.info(
"PEP: matched protected route %s %s and scope=%s",
_method,
request.url.path,
scope,
)

pdp_client = self._get_pdp_client()

token = self._get_bearer_token(request)
if not token:
logger.warning(
"PEP: missing Bearer token for %s %s", _method, request.url.path
)
return pep_error_response(
401,
"Missing or invalid Authorization header (Bearer token required)",
{"WWW-Authenticate": "Bearer"},
)

resource_id = await self._extract_resource_id(request)
if not resource_id:
logger.warning("PEP: no resource ID for %s %s", _method, request.url.path)
return pep_error_response(
403, "Could not determine resource for authorization"
)

logger.info(
"PEP: checking permission resource_id=%s, scope=%s, path=%s",
resource_id,
scope,
request.url.path,
)

try:
authorized = pdp_client.check_permission(
access_token=token,
resource_id=resource_id,
scope=scope,
)
except TokenError as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Appreciate the structured and comprehensive error responses!

logger.warning(
"PEP: token error for %s %s: %s", _method, request.url.path, e.detail
)
return pep_error_response(
401, e.detail, {"WWW-Authenticate": 'Bearer error="invalid_token"'}
)
except ResourceNotFoundError as e:
logger.warning(
"PEP: resource not found for %s %s: %s",
_method,
request.url.path,
e.resource_id,
)
return pep_error_response(
404,
f"The requested tenant resource ({e.resource_id}) does not exist. "
"Verify that the tenant name is correct.",
)
except Exception as e:
logger.exception(
"PEP: Keycloak check failed for resource_id=%s scope=%s: %s",
resource_id,
scope,
e,
)
return pep_error_response(
502, "Authorization service temporarily unavailable"
)

logger.info(
"PEP: authorization result=%s for resource_id=%s, scope=%s, path=%s",
authorized,
resource_id,
scope,
request.url.path,
)

if not authorized:
logger.warning(
"PEP: denied %s %s resource_id=%s, scope=%s",
_method,
request.url.path,
resource_id,
scope,
)
return pep_error_response(
403,
(
f"You do not have permission to {scope} this resource "
f"({resource_id}). Verify that your user belongs to "
f"the required tenant and role needed."
),
)

return await call_next(request)
Loading