Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions common/auth/tests/test_pep_middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""Unit tests for PEP middleware route matching"""

from unittest.mock import MagicMock

import pytest
from veda_auth.pep_middleware import (
DEFAULT_PROTECTED_ROUTES,
STAC_PROTECTED_ROUTES,
PEPMiddleware,
)


def _request(path: str, method: str = "GET"):
"""Request mock for route matching"""
req = MagicMock()
req.url.path = path.rstrip("/") or "/"
req.method = method.upper()
return req


class TestDefaultProtectedRoutes:
"""DEFAULT_PROTECTED_ROUTES tests"""

def test_post_collections_matches(self):
"""POST collections should return create and POST"""
app = MagicMock()
middleware = PEPMiddleware(
app,
pdp_client=MagicMock(),
resource_extractor=MagicMock(),
protected_routes=DEFAULT_PROTECTED_ROUTES,
)
result = middleware._get_matching_scope_and_route(
_request("/collections", "POST")
)
assert result == ("create", "POST")

def test_put_collections_no_match(self):
"""PUT /collections/{id} does not match DEFAULT_PROTECTED_ROUTES so it returns None"""
app = MagicMock()
middleware = PEPMiddleware(
app,
pdp_client=MagicMock(),
resource_extractor=MagicMock(),
protected_routes=DEFAULT_PROTECTED_ROUTES,
)
result = middleware._get_matching_scope_and_route(
_request("/collections/random", "PUT")
)
assert result is None

def test_get_collections_no_match(self):
"""GET on collections should return None"""
app = MagicMock()
middleware = PEPMiddleware(
app,
pdp_client=MagicMock(),
resource_extractor=MagicMock(),
protected_routes=DEFAULT_PROTECTED_ROUTES,
)
result = middleware._get_matching_scope_and_route(
_request("/collections", "GET")
)
assert result is None


class TestStacProtectedRoutes:
"""STAC_PROTECTED_ROUTES (all collection and item write operations)"""

@pytest.fixture
def middleware(self):
"""PEP Middlware mock"""
app = MagicMock()
return PEPMiddleware(
app,
pdp_client=MagicMock(),
resource_extractor=MagicMock(),
protected_routes=STAC_PROTECTED_ROUTES,
)

def test_post_collections_matches_create(self, middleware):
"""POST /collections matches with scope create"""
result = middleware._get_matching_scope_and_route(
_request("/api/stac/collections", "POST")
)
assert result == ("create", "POST")

def test_put_collection_matches_update(self, middleware):
"""PUT /collections/{id} matches with scope update"""
result = middleware._get_matching_scope_and_route(
_request("/api/stac/collections/some-collection", "PUT")
)
assert result == ("update", "PUT")

def test_patch_collection_matches_update(self, middleware):
"""PATCH /collections/{id} matches with scope update"""
result = middleware._get_matching_scope_and_route(
_request("/api/stac/collections/some-collection", "PATCH")
)
assert result == ("update", "PATCH")

def test_delete_collection_matches_delete(self, middleware):
"""DELETE /collections/{id} matches with scope delete"""
result = middleware._get_matching_scope_and_route(
_request("/api/stac/collections/some-collection", "DELETE")
)
assert result == ("delete", "DELETE")

def test_get_collections_no_match(self, middleware):
"""GET /collections does not match so it returns None"""
result = middleware._get_matching_scope_and_route(
_request("/api/stac/collections", "GET")
)
assert result is None

def test_search_no_match(self, middleware):
"""POST /search does not match so it returns None"""
result = middleware._get_matching_scope_and_route(
_request("/api/stac/search", "POST")
)
assert result is None
19 changes: 17 additions & 2 deletions common/auth/veda_auth/pep_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
ResourceNotFoundError,
TokenError,
)
from veda_auth.resource_extractors import COLLECTIONS_CREATE_PATH_RE
from veda_auth.resource_extractors import (
COLLECTIONS_CREATE_PATH_RE,
COLLECTIONS_PATH_RE,
)

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
Expand All @@ -35,8 +38,20 @@ class ProtectedRoute:
scope: str


DEFAULT_PROTECTED_ROUTES: Sequence[ProtectedRoute] = (
CREATE_COLLECTION_ROUTE = ProtectedRoute(
path_re=COLLECTIONS_CREATE_PATH_RE,
method="POST",
scope="create",
)

DEFAULT_PROTECTED_ROUTES: Sequence[ProtectedRoute] = (CREATE_COLLECTION_ROUTE,)


STAC_PROTECTED_ROUTES: Sequence[ProtectedRoute] = (
# Collections
ProtectedRoute(path_re=COLLECTIONS_CREATE_PATH_RE, method="POST", scope="create"),
ProtectedRoute(path_re=COLLECTIONS_PATH_RE, method="PUT", scope="update"),
ProtectedRoute(path_re=COLLECTIONS_PATH_RE, method="PATCH", scope="update"),
)


Expand Down
4 changes: 3 additions & 1 deletion stac_api/runtime/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ async def lifespan(app: FastAPI):
postgres_settings=api_settings.postgres_settings,
add_write_connection_pool=True,
)

yield
await close_db_connection(app)

Expand Down Expand Up @@ -179,13 +180,14 @@ def _get_keycloak_pdp_client():
"PEP middleware enabled, secret_name=%s",
api_settings.keycloak_uma_resource_server_client_secret_name,
)
from veda_auth.pep_middleware import PEPMiddleware
from veda_auth.pep_middleware import STAC_PROTECTED_ROUTES, PEPMiddleware
from veda_auth.resource_extractors import extract_stac_resource_id

app.add_middleware(
PEPMiddleware,
pdp_client=_get_keycloak_pdp_client,
resource_extractor=extract_stac_resource_id,
protected_routes=STAC_PROTECTED_ROUTES,
)
else:
logger.info(
Expand Down
111 changes: 111 additions & 0 deletions stac_api/runtime/tests/test_pep_integration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Integration tests for PEP middleware"""
import copy
import importlib
import os
import uuid
Expand All @@ -13,6 +14,8 @@

from stac_fastapi.pgstac.db import close_db_connection, connect_to_db

from .conftest import VALID_ITEM

VALID_COLLECTION_TEMPLATE = {
"type": "Collection",
"title": "Test Collection for PEP",
Expand Down Expand Up @@ -105,6 +108,18 @@ def _collection(tenant: Optional[str] = None) -> dict:
return body


def _item(collection_id: str, item_id: Optional[str] = None) -> dict:
"""Build a STAC item"""
item_id_value = item_id or f"pep-item-{uuid.uuid4().hex[:8]}"
item = copy.deepcopy(VALID_ITEM)
item["id"] = item_id_value
item["collection"] = collection_id
return item


AUTH_HEADERS = {"Authorization": "Bearer fake-valid-token"}


class TestPEPIntegration:
"""Integration tests for PEP middleware for POST /collections endpoint"""

Expand Down Expand Up @@ -243,3 +258,99 @@ async def test_get_collections_not_affected_by_pep(self, pep_client):
"""GET /collections should not be intercepted by PEP (because its not a protected route)"""
response = await pep_client.get(COLLECTIONS_ENDPOINT)
assert response.status_code == 200


class TestPEPCollectionUpdateDelete:
"""PEP for PUT/PATCH/DELETE collection"""

@pytest.mark.asyncio
async def test_put_collection_no_token_returns_401(
self, pep_client, mock_pdp_client
):
"""PUT /collections/{id} without token returns 401"""
collection = _collection()
# Create collection, then try to update it without token
await pep_client.post(
COLLECTIONS_ENDPOINT, json=collection, headers=AUTH_HEADERS
)
response = await pep_client.put(
f"{COLLECTIONS_ENDPOINT}/{collection['id']}",
json=collection,
)
# Should fail with 401
assert response.status_code == 401
# Cleanup
await pep_client.delete(
f"{COLLECTIONS_ENDPOINT}/{collection['id']}", headers=AUTH_HEADERS
)

@pytest.mark.asyncio
async def test_put_collection_authorized_succeeds(
self, pep_client, mock_pdp_client
):
"""PUT /collections/{id} with token and PDP allow succeeds, scope update"""
# Test setup
mock_pdp_client.check_permission.return_value = True
collection = _collection()
await pep_client.post(
COLLECTIONS_ENDPOINT, json=collection, headers=AUTH_HEADERS
)
response = await pep_client.put(
f"{COLLECTIONS_ENDPOINT}/{collection['id']}",
json=collection,
headers=AUTH_HEADERS,
)
assert response.status_code == 200
call_kwargs = mock_pdp_client.check_permission.call_args
assert call_kwargs.kwargs.get("scope") == "update"

# Test cleanup
await pep_client.delete(
f"{COLLECTIONS_ENDPOINT}/{collection['id']}", headers=AUTH_HEADERS
)

@pytest.mark.asyncio
async def test_patch_collection_no_token_returns_401(
self, pep_client, mock_pdp_client
):
"""PATCH /collections/{id} without token returns 401"""
# Test setup
collection = _collection()
await pep_client.post(
COLLECTIONS_ENDPOINT, json=collection, headers=AUTH_HEADERS
)
response = await pep_client.patch(
f"{COLLECTIONS_ENDPOINT}/{collection['id']}",
json={"description": "Updated"},
)
assert response.status_code == 401

# Test cleanup
await pep_client.delete(
f"{COLLECTIONS_ENDPOINT}/{collection['id']}", headers=AUTH_HEADERS
)

@pytest.mark.asyncio
async def test_patch_collection_authorized_succeeds(
self, pep_client, mock_pdp_client
):
"""PATCH /collections/{id} with token succeeds, scope update"""
# Test setup
mock_pdp_client.check_permission.return_value = True
collection = _collection()
await pep_client.post(
COLLECTIONS_ENDPOINT, json=collection, headers=AUTH_HEADERS
)
response = await pep_client.patch(
f"{COLLECTIONS_ENDPOINT}/{collection['id']}",
json={"description": "Updated"},
headers=AUTH_HEADERS,
)
assert response.status_code == 200
call_kwargs = mock_pdp_client.check_permission.call_args
assert call_kwargs.kwargs.get("scope") == "update"

# Test cleanup
await pep_client.delete(
f"{COLLECTIONS_ENDPOINT}/{collection['id']}", headers=AUTH_HEADERS
)
Loading