-
Notifications
You must be signed in to change notification settings - Fork 7
feat(stac): policy enforcement point proof of concept #569
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 33 commits
66d464b
a17407f
921095c
2d96128
9d08bef
9e5a724
0629819
283f9ab
605a881
bb514b4
e4c4a9d
b0a0342
38bc05f
c64d035
bec19a6
3fa37e1
cd79d46
266818c
480d5bb
e7c004f
aaf029a
e4f0113
e360ced
a6e4f60
5c0b8c1
b2380c2
9b033e0
2176ff9
601fdc1
31c8873
a8a9a55
6721ff0
afdc19a
243a793
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,14 +7,75 @@ | |
| import base64 | ||
| import json | ||
| import logging | ||
| from typing import Any, Dict, List, Optional, Tuple | ||
| from urllib.parse import urlencode | ||
| from typing import Any, Dict, List, Optional, Tuple, Union | ||
| from urllib.parse import urlencode, urlparse | ||
|
|
||
| import httpx | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class TokenError(Exception): | ||
| """Raised when the access token is expired, revoked, or invalid. | ||
| Keycloak returns HTTP 401 | ||
| """ | ||
|
|
||
| def __init__(self, detail: str = "Access token is expired or invalid"): | ||
| """To use when there is a token error for RPT call""" | ||
| self.detail = detail | ||
| super().__init__(detail) | ||
|
|
||
|
|
||
| class ResourceNotFoundError(Exception): | ||
| """Raised when Keycloak returns HTTP 400 with invalid_resource error. | ||
| This means the requested resource (tenant) does not exist in the | ||
| resource server | ||
| """ | ||
|
|
||
| def __init__(self, resource_id: str): | ||
| """Initialize with the resource ID that was not found""" | ||
| self.resource_id = resource_id | ||
| super().__init__(f"Resource not found: {resource_id}") | ||
|
|
||
|
|
||
| class PermissionDeniedError(Exception): | ||
| """Raised when Keycloak returns HTTP 403 forbidden which means | ||
| the user does not have permission for the requested resource and scope. | ||
| """ | ||
|
|
||
| def __init__(self, resource_id: str, scope: Optional[str] = None): | ||
| """Initialize with the resource ID and optional scope that was denied""" | ||
| self.resource_id = resource_id | ||
| self.scope = scope | ||
| super().__init__(f"Permission denied: {resource_id}") | ||
|
|
||
|
|
||
| def parse_keycloak_from_openid_url( | ||
| openid_configuration_url: Union[str, Any] | ||
| ) -> Tuple[str, str]: | ||
| """Extract Keycloak base URL and realm from an OpenID discovery URL such as https://<host>/realms/<realm>/.well-known/openid-configuration""" | ||
| if not openid_configuration_url: | ||
| raise ValueError("Missing or empty OpenID configuration URL") | ||
| url_str = str(openid_configuration_url).strip() | ||
|
|
||
| parsed = urlparse(url_str) | ||
| path = (parsed.path or "").rstrip("/") | ||
|
|
||
| if "/realms/" not in path: | ||
| raise ValueError( | ||
| "OpenID configuration URL must contain /realms/<realm>/ " | ||
| "(e.g. .../realms/my-realm/.well-known/openid-configuration). " | ||
| f"Got path: {repr(path)}" | ||
| ) | ||
botanical marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| realm = path.split("/realms/")[-1].split("/")[0] | ||
| if not realm: | ||
| raise ValueError("Could not extract realm from OpenID configuration URL") | ||
|
|
||
| keycloak_url = f"{parsed.scheme}://{parsed.netloc}" | ||
| return keycloak_url, realm | ||
|
|
||
|
|
||
| def _add_base64_padding(payload: str) -> str: | ||
| """Add padding to base64 string if needed | ||
|
|
||
|
|
@@ -157,12 +218,71 @@ def _extract_permissions_from_jwt(self, jwt_token: str) -> List[Dict[str, Any]]: | |
| logger.warning(f"Failed to extract permissions from JWT: {e}") | ||
| return [] | ||
|
|
||
| def _resolve_permissions( | ||
| self, rpt_response: Dict[str, Any] | ||
| ) -> List[Dict[str, Any]]: | ||
| """Extract permissions from an RPT response, and fall back to the JWT""" | ||
| permissions = rpt_response.get("permissions", []) | ||
| if not permissions: | ||
| rpt_jwt = rpt_response.get("access_token") | ||
| if rpt_jwt: | ||
| permissions = self._extract_permissions_from_jwt(rpt_jwt) | ||
| logger.debug(f"Extracted {len(permissions)} permissions from RPT JWT") | ||
| return permissions | ||
|
|
||
| def _has_matching_permission( | ||
| self, | ||
| permissions: List[Dict[str, Any]], | ||
| resource_id: str, | ||
| scope: str, | ||
| ) -> bool: | ||
| """Return True if permissions contain a grant for resource_id and scope | ||
|
|
||
| See https://www.keycloak.org/docs/latest/authorization_services/#_service_rpt_overview | ||
| """ | ||
| for permission in permissions: | ||
| rsname = permission.get("rsname") or permission.get("resource_id") | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm unclear on why
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah that's a good question! Basically we're not using We need the name to match and to parse type/tenant, so we use rsname (and resource_id when it’s the name). rsid only ever gives us the UUID which we don't use. I guess really we don't need resource_id but I had that as a fallback because I wasn't sure where keycloak would put the resource name. |
||
| if rsname == resource_id and scope in permission.get("scopes", []): | ||
| return True | ||
| return False | ||
|
|
||
| def _handle_rpt_http_error( | ||
| self, | ||
| error: httpx.HTTPStatusError, | ||
| resource_id: str, | ||
| scope: Optional[str] = None, | ||
| ) -> None: | ||
| """Translate an HTTPStatusError from get_rpt | ||
|
|
||
| Raises TokenError for 401, PermissionDeniedError for 403, | ||
| ResourceNotFoundError for 400 invalid_resource. | ||
| Re-raises unhandled status codes. | ||
| """ | ||
| if error.response.status_code == 401: | ||
| logger.warning("Token rejected (401): %s", error.response.text) | ||
| raise TokenError( | ||
| "Access token is expired or invalid. Please re-authenticate." | ||
| ) from error | ||
| if error.response.status_code == 403: | ||
| raise PermissionDeniedError(resource_id=resource_id, scope=scope) from error | ||
| if error.response.status_code == 400: | ||
| try: | ||
| error_body = error.response.json() | ||
| except Exception: | ||
| error_body = {} | ||
| if error_body.get("error") == "invalid_resource": | ||
| raise ResourceNotFoundError(resource_id=resource_id) from error | ||
| logger.error( | ||
| f"Permission check failed: {error.response.status_code} {error.response.text}" | ||
| ) | ||
| raise error | ||
|
|
||
| def check_permission( | ||
| self, | ||
| access_token: str, | ||
| resource_id: str, | ||
| scope: str, | ||
| ) -> bool: | ||
| ): | ||
| """Check if user has permission for a resource and scope | ||
|
|
||
| Args: | ||
|
|
@@ -183,37 +303,10 @@ def check_permission( | |
| } | ||
| ], | ||
| ) | ||
|
|
||
| permissions = rpt_response.get("permissions", []) | ||
| if not permissions: | ||
| rpt_jwt = rpt_response.get("access_token") | ||
| if rpt_jwt: | ||
| permissions = self._extract_permissions_from_jwt(rpt_jwt) | ||
| logger.debug( | ||
| f"Extracted {len(permissions)} permissions from RPT JWT" | ||
| ) | ||
|
|
||
| # https://www.keycloak.org/docs/latest/authorization_services/#_service_rpt_overview | ||
| for permission in permissions: | ||
| # Check rsid (RPT token format), resource_id (introspection format), or rsname (resource name) | ||
| resource_identifier = ( | ||
| permission.get("rsid") | ||
| or permission.get("resource_id") | ||
| or permission.get("rsname") | ||
| ) | ||
| if resource_identifier == resource_id: | ||
| scopes = permission.get("scopes", []) | ||
| if scope in scopes: | ||
| return True | ||
|
|
||
| return False | ||
| permissions = self._resolve_permissions(rpt_response) | ||
| return self._has_matching_permission(permissions, resource_id, scope) | ||
| except httpx.HTTPStatusError as e: | ||
| if e.response.status_code in (401, 403): | ||
| return False | ||
| logger.error( | ||
| f"Permission check failed: {e.response.status_code} {e.response.text}" | ||
| ) | ||
| raise | ||
| self._handle_rpt_http_error(e, resource_id, scope=scope) | ||
| except Exception as e: | ||
| logger.error(f"Unexpected error checking permission: {e}") | ||
| raise | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.