diff --git a/common/auth/tests/test_pep_middleware.py b/common/auth/tests/test_pep_middleware.py new file mode 100644 index 00000000..ae9eb3e5 --- /dev/null +++ b/common/auth/tests/test_pep_middleware.py @@ -0,0 +1,121 @@ +"""Unit tests for PEP middleware route matching""" + +from unittest.mock import MagicMock + +import pytest +from veda_auth.pep_middleware import ( + DEFAULT_PROTECTED_ROUTES, + STAC_PROTECTED_ROUTES, + PEPMiddleware, +) + + +def _request(path: str, method: str = "GET"): + """Request mock for route matching""" + req = MagicMock() + req.url.path = path.rstrip("/") or "/" + req.method = method.upper() + return req + + +class TestDefaultProtectedRoutes: + """DEFAULT_PROTECTED_ROUTES tests""" + + def test_post_collections_matches(self): + """POST collections should return create and POST""" + app = MagicMock() + middleware = PEPMiddleware( + app, + pdp_client=MagicMock(), + resource_extractor=MagicMock(), + protected_routes=DEFAULT_PROTECTED_ROUTES, + ) + result = middleware._get_matching_scope_and_route( + _request("/collections", "POST") + ) + assert result == ("create", "POST") + + def test_put_collections_no_match(self): + """PUT /collections/{id} does not match DEFAULT_PROTECTED_ROUTES so it returns None""" + app = MagicMock() + middleware = PEPMiddleware( + app, + pdp_client=MagicMock(), + resource_extractor=MagicMock(), + protected_routes=DEFAULT_PROTECTED_ROUTES, + ) + result = middleware._get_matching_scope_and_route( + _request("/collections/random", "PUT") + ) + assert result is None + + def test_get_collections_no_match(self): + """GET on collections should return None""" + app = MagicMock() + middleware = PEPMiddleware( + app, + pdp_client=MagicMock(), + resource_extractor=MagicMock(), + protected_routes=DEFAULT_PROTECTED_ROUTES, + ) + result = middleware._get_matching_scope_and_route( + _request("/collections", "GET") + ) + assert result is None + + +class TestStacProtectedRoutes: + """STAC_PROTECTED_ROUTES (all collection and item write operations)""" + + @pytest.fixture + def middleware(self): + """PEP Middlware mock""" + app = MagicMock() + return PEPMiddleware( + app, + pdp_client=MagicMock(), + resource_extractor=MagicMock(), + protected_routes=STAC_PROTECTED_ROUTES, + ) + + def test_post_collections_matches_create(self, middleware): + """POST /collections matches with scope create""" + result = middleware._get_matching_scope_and_route( + _request("/api/stac/collections", "POST") + ) + assert result == ("create", "POST") + + def test_put_collection_matches_update(self, middleware): + """PUT /collections/{id} matches with scope update""" + result = middleware._get_matching_scope_and_route( + _request("/api/stac/collections/some-collection", "PUT") + ) + assert result == ("update", "PUT") + + def test_patch_collection_matches_update(self, middleware): + """PATCH /collections/{id} matches with scope update""" + result = middleware._get_matching_scope_and_route( + _request("/api/stac/collections/some-collection", "PATCH") + ) + assert result == ("update", "PATCH") + + def test_delete_collection_matches_delete(self, middleware): + """DELETE /collections/{id} matches with scope delete""" + result = middleware._get_matching_scope_and_route( + _request("/api/stac/collections/some-collection", "DELETE") + ) + assert result == ("delete", "DELETE") + + def test_get_collections_no_match(self, middleware): + """GET /collections does not match so it returns None""" + result = middleware._get_matching_scope_and_route( + _request("/api/stac/collections", "GET") + ) + assert result is None + + def test_search_no_match(self, middleware): + """POST /search does not match so it returns None""" + result = middleware._get_matching_scope_and_route( + _request("/api/stac/search", "POST") + ) + assert result is None diff --git a/common/auth/veda_auth/pep_middleware.py b/common/auth/veda_auth/pep_middleware.py index ce90a238..1ed01d9b 100644 --- a/common/auth/veda_auth/pep_middleware.py +++ b/common/auth/veda_auth/pep_middleware.py @@ -11,7 +11,10 @@ ResourceNotFoundError, TokenError, ) -from veda_auth.resource_extractors import COLLECTIONS_CREATE_PATH_RE +from veda_auth.resource_extractors import ( + COLLECTIONS_CREATE_PATH_RE, + COLLECTIONS_PATH_RE, +) from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request @@ -35,8 +38,20 @@ class ProtectedRoute: scope: str -DEFAULT_PROTECTED_ROUTES: Sequence[ProtectedRoute] = ( +CREATE_COLLECTION_ROUTE = ProtectedRoute( + path_re=COLLECTIONS_CREATE_PATH_RE, + method="POST", + scope="create", +) + +DEFAULT_PROTECTED_ROUTES: Sequence[ProtectedRoute] = (CREATE_COLLECTION_ROUTE,) + + +STAC_PROTECTED_ROUTES: Sequence[ProtectedRoute] = ( + # Collections ProtectedRoute(path_re=COLLECTIONS_CREATE_PATH_RE, method="POST", scope="create"), + ProtectedRoute(path_re=COLLECTIONS_PATH_RE, method="PUT", scope="update"), + ProtectedRoute(path_re=COLLECTIONS_PATH_RE, method="PATCH", scope="update"), ) diff --git a/stac_api/runtime/src/app.py b/stac_api/runtime/src/app.py index bf38094a..9500988d 100644 --- a/stac_api/runtime/src/app.py +++ b/stac_api/runtime/src/app.py @@ -56,6 +56,7 @@ async def lifespan(app: FastAPI): postgres_settings=api_settings.postgres_settings, add_write_connection_pool=True, ) + yield await close_db_connection(app) @@ -179,13 +180,14 @@ def _get_keycloak_pdp_client(): "PEP middleware enabled, secret_name=%s", api_settings.keycloak_uma_resource_server_client_secret_name, ) - from veda_auth.pep_middleware import PEPMiddleware + from veda_auth.pep_middleware import STAC_PROTECTED_ROUTES, PEPMiddleware from veda_auth.resource_extractors import extract_stac_resource_id app.add_middleware( PEPMiddleware, pdp_client=_get_keycloak_pdp_client, resource_extractor=extract_stac_resource_id, + protected_routes=STAC_PROTECTED_ROUTES, ) else: logger.info( diff --git a/stac_api/runtime/tests/test_pep_integration.py b/stac_api/runtime/tests/test_pep_integration.py index 1ddf9ce6..9e1d0a6b 100644 --- a/stac_api/runtime/tests/test_pep_integration.py +++ b/stac_api/runtime/tests/test_pep_integration.py @@ -1,4 +1,5 @@ """Integration tests for PEP middleware""" +import copy import importlib import os import uuid @@ -13,6 +14,8 @@ from stac_fastapi.pgstac.db import close_db_connection, connect_to_db +from .conftest import VALID_ITEM + VALID_COLLECTION_TEMPLATE = { "type": "Collection", "title": "Test Collection for PEP", @@ -105,6 +108,18 @@ def _collection(tenant: Optional[str] = None) -> dict: return body +def _item(collection_id: str, item_id: Optional[str] = None) -> dict: + """Build a STAC item""" + item_id_value = item_id or f"pep-item-{uuid.uuid4().hex[:8]}" + item = copy.deepcopy(VALID_ITEM) + item["id"] = item_id_value + item["collection"] = collection_id + return item + + +AUTH_HEADERS = {"Authorization": "Bearer fake-valid-token"} + + class TestPEPIntegration: """Integration tests for PEP middleware for POST /collections endpoint""" @@ -243,3 +258,99 @@ async def test_get_collections_not_affected_by_pep(self, pep_client): """GET /collections should not be intercepted by PEP (because its not a protected route)""" response = await pep_client.get(COLLECTIONS_ENDPOINT) assert response.status_code == 200 + + +class TestPEPCollectionUpdateDelete: + """PEP for PUT/PATCH/DELETE collection""" + + @pytest.mark.asyncio + async def test_put_collection_no_token_returns_401( + self, pep_client, mock_pdp_client + ): + """PUT /collections/{id} without token returns 401""" + collection = _collection() + # Create collection, then try to update it without token + await pep_client.post( + COLLECTIONS_ENDPOINT, json=collection, headers=AUTH_HEADERS + ) + response = await pep_client.put( + f"{COLLECTIONS_ENDPOINT}/{collection['id']}", + json=collection, + ) + # Should fail with 401 + assert response.status_code == 401 + # Cleanup + await pep_client.delete( + f"{COLLECTIONS_ENDPOINT}/{collection['id']}", headers=AUTH_HEADERS + ) + + @pytest.mark.asyncio + async def test_put_collection_authorized_succeeds( + self, pep_client, mock_pdp_client + ): + """PUT /collections/{id} with token and PDP allow succeeds, scope update""" + # Test setup + mock_pdp_client.check_permission.return_value = True + collection = _collection() + await pep_client.post( + COLLECTIONS_ENDPOINT, json=collection, headers=AUTH_HEADERS + ) + response = await pep_client.put( + f"{COLLECTIONS_ENDPOINT}/{collection['id']}", + json=collection, + headers=AUTH_HEADERS, + ) + assert response.status_code == 200 + call_kwargs = mock_pdp_client.check_permission.call_args + assert call_kwargs.kwargs.get("scope") == "update" + + # Test cleanup + await pep_client.delete( + f"{COLLECTIONS_ENDPOINT}/{collection['id']}", headers=AUTH_HEADERS + ) + + @pytest.mark.asyncio + async def test_patch_collection_no_token_returns_401( + self, pep_client, mock_pdp_client + ): + """PATCH /collections/{id} without token returns 401""" + # Test setup + collection = _collection() + await pep_client.post( + COLLECTIONS_ENDPOINT, json=collection, headers=AUTH_HEADERS + ) + response = await pep_client.patch( + f"{COLLECTIONS_ENDPOINT}/{collection['id']}", + json={"description": "Updated"}, + ) + assert response.status_code == 401 + + # Test cleanup + await pep_client.delete( + f"{COLLECTIONS_ENDPOINT}/{collection['id']}", headers=AUTH_HEADERS + ) + + @pytest.mark.asyncio + async def test_patch_collection_authorized_succeeds( + self, pep_client, mock_pdp_client + ): + """PATCH /collections/{id} with token succeeds, scope update""" + # Test setup + mock_pdp_client.check_permission.return_value = True + collection = _collection() + await pep_client.post( + COLLECTIONS_ENDPOINT, json=collection, headers=AUTH_HEADERS + ) + response = await pep_client.patch( + f"{COLLECTIONS_ENDPOINT}/{collection['id']}", + json={"description": "Updated"}, + headers=AUTH_HEADERS, + ) + assert response.status_code == 200 + call_kwargs = mock_pdp_client.check_permission.call_args + assert call_kwargs.kwargs.get("scope") == "update" + + # Test cleanup + await pep_client.delete( + f"{COLLECTIONS_ENDPOINT}/{collection['id']}", headers=AUTH_HEADERS + )