diff --git a/docs/integrations/keycloak.mdx b/docs/integrations/keycloak.mdx index 22d61f132..e6f3f1c96 100644 --- a/docs/integrations/keycloak.mdx +++ b/docs/integrations/keycloak.mdx @@ -139,3 +139,47 @@ auth = KeycloakAuthProvider( token_verifier=custom_verifier, ) ``` + +## Proxying to Keycloak with Offline Tokens + +`KeycloakAuthProvider` expects clients that can complete Dynamic Client +Registration and the full OAuth flow themselves. When your MCP clients cannot +do that — or when you want FastMCP to act as the OAuth client and broker tokens +on their behalf — use `KeycloakOAuthProxy` instead. It behaves like the generic +`OAuthProxy`, but understands one Keycloak-specific convention that the generic +proxy gets wrong. + +Keycloak issues long-lived *offline* refresh tokens (requested via the +`offline_access` scope) and signals "this refresh token has no fixed expiry" by +returning `refresh_expires_in=0`. The generic `OAuthProxy` interprets `0` as an +unknown/zero lifetime, so the FastMCP refresh token TTL shrinks on every refresh +cycle until it hits zero and forces the user to re-authenticate — even though +the underlying Keycloak offline token is still valid. `KeycloakOAuthProxy` +treats `0` as the never-expires sentinel and keeps offline sessions alive +indefinitely. + +Because offline tokens require the `offline_access` scope at registration time, +pass `valid_scopes` so clients can register for it. Keep `required_scopes` +focused on what every access token must carry — `valid_scopes` controls what +Dynamic Client Registration will accept, independently of token verification: + +```python +from fastmcp import FastMCP +from fastmcp.server.auth.providers.keycloak import KeycloakOAuthProxy + +auth = KeycloakOAuthProxy( + realm_url="https://keycloak.example.com/realms/myrealm", + upstream_client_id="my-client", + upstream_client_secret="my-secret", + base_url="https://my-mcp-server.example.com", + jwt_signing_key="a-strong-signing-secret", + required_scopes=["openid"], + valid_scopes=["openid", "offline_access"], +) + +mcp = FastMCP("My App", auth=auth) +``` + +The FastMCP refresh token is reissued on every refresh cycle, so as long as the +client keeps refreshing, the offline session remains valid without further user +interaction. diff --git a/fastmcp_slim/fastmcp/server/auth/oauth_proxy/models.py b/fastmcp_slim/fastmcp/server/auth/oauth_proxy/models.py index 5d2366a76..57f56047d 100644 --- a/fastmcp_slim/fastmcp/server/auth/oauth_proxy/models.py +++ b/fastmcp_slim/fastmcp/server/auth/oauth_proxy/models.py @@ -92,6 +92,7 @@ class UpstreamTokenSet(BaseModel): refresh_token_expires_at: ( float | None ) # Unix timestamp when refresh token expires (if known) + refresh_token_never_expires: bool = False # True = upstream explicitly said "no expiry" (e.g. Keycloak offline_access) expires_at: float # Unix timestamp when access token expires token_type: str # Usually "Bearer" scope: str # Space-separated scopes diff --git a/fastmcp_slim/fastmcp/server/auth/oauth_proxy/proxy.py b/fastmcp_slim/fastmcp/server/auth/oauth_proxy/proxy.py index c94d43b21..698cc2885 100644 --- a/fastmcp_slim/fastmcp/server/auth/oauth_proxy/proxy.py +++ b/fastmcp_slim/fastmcp/server/auth/oauth_proxy/proxy.py @@ -231,6 +231,18 @@ class OAuthProxy(OAuthProvider, ConsentMixin): - Generic: Works with any spec-compliant provider """ + def _upstream_refresh_token_never_expires(self, refresh_expires_in: int) -> bool: + """Return True if the upstream's refresh_expires_in value signals the token never expires. + + Override in subclasses to handle provider-specific conventions. The base + implementation always returns False — an unknown value is treated as a + normal (finite) expiry and falls through to the configured fallback TTL. + + Args: + refresh_expires_in: The raw integer value from the upstream token response. + """ + return False + def __init__( self, *, @@ -1090,22 +1102,35 @@ async def exchange_authorization_code( # Some providers include refresh_expires_in, some don't refresh_expires_in = None refresh_token_expires_at = None + refresh_token_never_expires = False if idp_tokens.get("refresh_token"): - if "refresh_expires_in" in idp_tokens and int( - idp_tokens["refresh_expires_in"] - ): - refresh_expires_in = int(idp_tokens["refresh_expires_in"]) - refresh_token_expires_at = time.time() + refresh_expires_in - logger.debug( - "Upstream refresh token expires in %d seconds", refresh_expires_in - ) - else: - # Upstream didn't specify; use configured fallback (default 1 year). - # The FastMCP refresh JWT is just a signed pointer — if the real - # upstream refresh has expired or been revoked, the next refresh - # call to upstream will fail and the client re-auths. + if "refresh_expires_in" in idp_tokens: + val = int(idp_tokens["refresh_expires_in"]) + if val > 0: + refresh_expires_in = val + refresh_token_expires_at = time.time() + refresh_expires_in + logger.debug( + "Upstream refresh token expires in %d seconds", + refresh_expires_in, + ) + elif val == 0 and self._upstream_refresh_token_never_expires(val): + # Provider explicitly signals "no expiry" (e.g. Keycloak offline_access). + # refresh_token_expires_at stays None (no upstream expiry to track). + # We still need a finite FastMCP RT TTL; use the configured fallback. + # The FastMCP RT is renewed on every transparent refresh, so active + # sessions roll forward automatically without ever hitting a hard wall. + refresh_token_never_expires = True + logger.debug( + "Upstream refresh_expires_in=0 (never expires); " + "FastMCP RT will use fallback TTL of %d seconds", + self._fallback_refresh_token_expiry_seconds, + ) + if refresh_expires_in is None: + # Upstream didn't specify expiry; use configured fallback (default 1 year). refresh_expires_in = self._fallback_refresh_token_expiry_seconds - refresh_token_expires_at = time.time() + refresh_expires_in + if not refresh_token_never_expires: + # Track wall-clock expiry only for tokens with a real deadline. + refresh_token_expires_at = time.time() + refresh_expires_in logger.debug( "Upstream refresh token expiry unknown, using fallback %d seconds", refresh_expires_in, @@ -1119,6 +1144,7 @@ async def exchange_authorization_code( if idp_tokens.get("refresh_token") else None, refresh_token_expires_at=refresh_token_expires_at, + refresh_token_never_expires=refresh_token_never_expires, expires_at=time.time() + expires_in, token_type=idp_tokens.get("token_type", "Bearer"), scope=" ".join(granted_scopes), @@ -1408,28 +1434,35 @@ async def exchange_refresh_token( logger.debug("Upstream refresh token rotated") # Update refresh token expiry if provided - if "refresh_expires_in" in token_response and int( - token_response["refresh_expires_in"] - ): - new_refresh_expires_in = int(token_response["refresh_expires_in"]) - upstream_token_set.refresh_token_expires_at = ( - time.time() + new_refresh_expires_in - ) - logger.debug( - "Upstream refresh token expires in %d seconds", - new_refresh_expires_in, - ) - elif upstream_token_set.refresh_token_expires_at: - # Keep existing expiry if upstream doesn't provide new one - new_refresh_expires_in = int( - upstream_token_set.refresh_token_expires_at - time.time() - ) - else: - # Upstream rotated the refresh token but gave no expiry; use fallback - new_refresh_expires_in = self._fallback_refresh_token_expiry_seconds - upstream_token_set.refresh_token_expires_at = ( - time.time() + new_refresh_expires_in - ) + if "refresh_expires_in" in token_response: + val = int(token_response["refresh_expires_in"]) + if val > 0: + new_refresh_expires_in = val + upstream_token_set.refresh_token_expires_at = ( + time.time() + new_refresh_expires_in + ) + logger.debug( + "Upstream refresh token expires in %d seconds", + new_refresh_expires_in, + ) + elif val == 0 and self._upstream_refresh_token_never_expires(val): + # Provider signals "no expiry" — mark and clear stale wall-clock time + # so the fallback below always issues a fresh full-length FastMCP RT. + upstream_token_set.refresh_token_never_expires = True + upstream_token_set.refresh_token_expires_at = None + if new_refresh_expires_in is None: + if upstream_token_set.refresh_token_expires_at: + # Keep existing expiry if upstream doesn't provide new one + new_refresh_expires_in = int( + upstream_token_set.refresh_token_expires_at - time.time() + ) + else: + # Upstream rotated the refresh token but gave no expiry (or never-expires); use fallback + new_refresh_expires_in = self._fallback_refresh_token_expiry_seconds + if not upstream_token_set.refresh_token_never_expires: + upstream_token_set.refresh_token_expires_at = ( + time.time() + new_refresh_expires_in + ) upstream_token_set.raw_token_data = { **upstream_token_set.raw_token_data, @@ -1623,22 +1656,28 @@ async def _try_transparent_refresh( if new_upstream_refresh := token_response.get("refresh_token"): if new_upstream_refresh != upstream_token_set.refresh_token: upstream_token_set.refresh_token = new_upstream_refresh - if "refresh_expires_in" in token_response and int( - token_response["refresh_expires_in"] - ): - new_refresh_expires_in = int(token_response["refresh_expires_in"]) - upstream_token_set.refresh_token_expires_at = ( - time.time() + new_refresh_expires_in - ) - elif upstream_token_set.refresh_token_expires_at: - new_refresh_expires_in = int( - upstream_token_set.refresh_token_expires_at - time.time() - ) - else: - new_refresh_expires_in = self._fallback_refresh_token_expiry_seconds - upstream_token_set.refresh_token_expires_at = ( - time.time() + new_refresh_expires_in - ) + if "refresh_expires_in" in token_response: + val = int(token_response["refresh_expires_in"]) + if val > 0: + new_refresh_expires_in = val + upstream_token_set.refresh_token_expires_at = ( + time.time() + new_refresh_expires_in + ) + elif val == 0 and self._upstream_refresh_token_never_expires(val): + # Provider signals "no expiry" — mark and clear stale wall-clock time. + upstream_token_set.refresh_token_never_expires = True + upstream_token_set.refresh_token_expires_at = None + if new_refresh_expires_in is None: + if upstream_token_set.refresh_token_expires_at: + new_refresh_expires_in = int( + upstream_token_set.refresh_token_expires_at - time.time() + ) + else: + new_refresh_expires_in = self._fallback_refresh_token_expiry_seconds + if not upstream_token_set.refresh_token_never_expires: + upstream_token_set.refresh_token_expires_at = ( + time.time() + new_refresh_expires_in + ) upstream_token_set.raw_token_data = { **upstream_token_set.raw_token_data, diff --git a/fastmcp_slim/fastmcp/server/auth/providers/keycloak.py b/fastmcp_slim/fastmcp/server/auth/providers/keycloak.py index d018bc4b0..17f7e9cb9 100644 --- a/fastmcp_slim/fastmcp/server/auth/providers/keycloak.py +++ b/fastmcp_slim/fastmcp/server/auth/providers/keycloak.py @@ -2,13 +2,19 @@ from __future__ import annotations +from typing import TYPE_CHECKING, Literal + from pydantic import AnyHttpUrl from fastmcp.server.auth import RemoteAuthProvider, TokenVerifier +from fastmcp.server.auth.oauth_proxy import OAuthProxy from fastmcp.server.auth.providers.jwt import JWTVerifier from fastmcp.utilities.auth import parse_scopes from fastmcp.utilities.logging import get_logger +if TYPE_CHECKING: + from key_value.aio.protocols import AsyncKeyValue + logger = get_logger(__name__) @@ -72,3 +78,166 @@ def __init__( authorization_servers=[AnyHttpUrl(self.realm_url)], base_url=AnyHttpUrl(str(base_url).rstrip("/")), ) + + +class KeycloakOAuthProxy(OAuthProxy): + """OAuth proxy for Keycloak identity providers. + + Use this instead of `OAuthProxy` when proxying to Keycloak. It handles + Keycloak-specific token response conventions, most importantly + `refresh_expires_in=0`, which Keycloak uses to indicate that a refresh + token obtained with the `offline_access` scope never expires. Standard + `OAuthProxy` treats `0` as an unknown expiry, which causes the FastMCP + refresh token TTL to shrink on every refresh cycle until it hits zero + and forces the user to re-authenticate — even though the Keycloak + offline token is still valid. + + All other behaviour is identical to `OAuthProxy`. Pass `realm_url` for + automatic endpoint discovery, or supply the individual endpoint URLs + directly. + + Example: + ```python + from fastmcp import FastMCP + from fastmcp.server.auth.providers.keycloak import KeycloakOAuthProxy + + auth = KeycloakOAuthProxy( + realm_url="https://keycloak.example.com/realms/myrealm", + upstream_client_id="my-client", + upstream_client_secret="my-secret", + base_url="https://my-mcp-server.example.com", + jwt_signing_key="some-secret", + ) + + mcp = FastMCP("My App", auth=auth) + ``` + """ + + # Keycloak uses refresh_expires_in=0 for offline_access tokens ("never expires"). + def _upstream_refresh_token_never_expires(self, refresh_expires_in: int) -> bool: + return refresh_expires_in == 0 + + def __init__( + self, + *, + realm_url: AnyHttpUrl | str | None = None, + upstream_client_id: str, + upstream_client_secret: str | None = None, + base_url: AnyHttpUrl | str, + resource_base_url: AnyHttpUrl | str | None = None, + issuer_url: AnyHttpUrl | str | None = None, + required_scopes: list[str] | str | None = None, + valid_scopes: list[str] | str | None = None, + audience: str | list[str] | None = None, + token_verifier: TokenVerifier | None = None, + # Direct endpoint overrides (optional if realm_url is provided) + upstream_authorization_endpoint: str | None = None, + upstream_token_endpoint: str | None = None, + upstream_revocation_endpoint: str | None = None, + redirect_path: str | None = None, + # Pass-through OAuthProxy options + jwt_signing_key: str | bytes | None = None, + client_storage: AsyncKeyValue | None = None, + require_authorization_consent: bool | Literal["remember", "external"] = True, + allowed_client_redirect_uris: list[str] | None = None, + fallback_refresh_token_expiry_seconds: int | None = None, + ): + """Initialize the Keycloak OAuth proxy. + + Args: + realm_url: Keycloak realm URL (e.g., "https://keycloak.example.com/realms/myrealm"). + Used to derive authorization and token endpoints automatically. Required unless + `upstream_authorization_endpoint` and `upstream_token_endpoint` are both provided. + upstream_client_id: Client ID of the application registered in Keycloak. + upstream_client_secret: Client secret. Optional for public clients. + base_url: Public URL of this FastMCP server. + resource_base_url: Optional public base URL for the protected resource + metadata and token audience. Defaults to `base_url`. Set this when + the resource is served under a different origin/path than the + proxy (e.g. behind a gateway). + issuer_url: Issuer URL for OAuth metadata. Defaults to `base_url`. Set + this when the authorization-server metadata must advertise a + different issuer than `base_url` (e.g. behind a gateway). + required_scopes: Scopes to require on incoming tokens. Defaults to `["openid"]`. + valid_scopes: Scopes advertised to clients via the `/.well-known` endpoints + and accepted during Dynamic Client Registration. Defaults to + `required_scopes`. Set this to include `offline_access` so clients can + register for the long-lived offline refresh tokens this proxy handles + without also forcing `offline_access` onto every access token. + audience: Optional JWT audience for token validation. Recommended for production. + token_verifier: Custom token verifier. Defaults to a JWTVerifier configured + for the Keycloak realm's JWKS endpoint. + upstream_authorization_endpoint: Override the authorization endpoint URL. + Required if `realm_url` is not provided. + upstream_token_endpoint: Override the token endpoint URL. + Required if `realm_url` is not provided. + upstream_revocation_endpoint: Optional token revocation endpoint. + redirect_path: Callback path registered with the upstream Keycloak + client. Defaults to `/auth/callback`. Set this when migrating an + existing deployment whose Keycloak client uses a non-default + callback path, so the authorization request keeps matching the + registered redirect URI. + jwt_signing_key: Secret for signing FastMCP JWTs. + client_storage: Storage backend for OAuth state. + require_authorization_consent: Consent screen behaviour (default True). + allowed_client_redirect_uris: Allowed MCP client redirect URI patterns. + fallback_refresh_token_expiry_seconds: FastMCP RT lifetime when Keycloak + returns `refresh_expires_in=0`. Defaults to 1 year. The token is + re-issued automatically on every refresh cycle, so active sessions + remain valid indefinitely. + """ + if realm_url is None and ( + upstream_authorization_endpoint is None or upstream_token_endpoint is None + ): + raise ValueError( + "Either realm_url or both upstream_authorization_endpoint and " + "upstream_token_endpoint must be provided." + ) + + realm = str(realm_url).rstrip("/") if realm_url else None + oidc_base = f"{realm}/protocol/openid-connect" if realm else None + + resolved_auth_endpoint = upstream_authorization_endpoint or f"{oidc_base}/auth" + resolved_token_endpoint = upstream_token_endpoint or f"{oidc_base}/token" + resolved_revocation_endpoint = upstream_revocation_endpoint or ( + f"{oidc_base}/revoke" if oidc_base else None + ) + + parsed_scopes = ( + parse_scopes(required_scopes) if required_scopes is not None else ["openid"] + ) + parsed_valid_scopes = ( + parse_scopes(valid_scopes) if valid_scopes is not None else None + ) + + if token_verifier is None: + if realm is None: + raise ValueError( + "token_verifier must be provided when realm_url is not set." + ) + token_verifier = JWTVerifier( + jwks_uri=f"{realm}/protocol/openid-connect/certs", + issuer=realm, + algorithm="RS256", + required_scopes=parsed_scopes, + audience=audience, + ) + + super().__init__( + upstream_authorization_endpoint=resolved_auth_endpoint, + upstream_token_endpoint=resolved_token_endpoint, + upstream_revocation_endpoint=resolved_revocation_endpoint, + upstream_client_id=upstream_client_id, + upstream_client_secret=upstream_client_secret, + token_verifier=token_verifier, + valid_scopes=parsed_valid_scopes, + redirect_path=redirect_path, + base_url=base_url, + resource_base_url=resource_base_url, + issuer_url=issuer_url, + jwt_signing_key=jwt_signing_key, + client_storage=client_storage, + require_authorization_consent=require_authorization_consent, + allowed_client_redirect_uris=allowed_client_redirect_uris, + fallback_refresh_token_expiry_seconds=fallback_refresh_token_expiry_seconds, + ) diff --git a/tests/server/auth/oauth_proxy/test_tokens.py b/tests/server/auth/oauth_proxy/test_tokens.py index 635819fe4..21f848fd0 100644 --- a/tests/server/auth/oauth_proxy/test_tokens.py +++ b/tests/server/auth/oauth_proxy/test_tokens.py @@ -29,6 +29,7 @@ _hash_token, ) from fastmcp.server.auth.providers.jwt import JWTVerifier +from fastmcp.server.auth.providers.keycloak import KeycloakOAuthProxy class TestOAuthProxyTokenEndpointAuth: @@ -822,6 +823,37 @@ async def test_ttl_uses_refresh_when_refresh_longer_than_access(self, proxy): ) assert upstream_tokens is not None + +class TestKeycloakOAuthProxy: + """Tests specific to KeycloakOAuthProxy's offline token handling. + + Keycloak returns refresh_expires_in=0 for offline_access tokens to signal + "no time-based expiry". KeycloakOAuthProxy must handle this correctly: + issue a refresh token, and never let the FastMCP RT TTL shrink across + repeated refresh cycles. + """ + + @pytest.fixture + def jwt_verifier(self): + verifier = Mock(spec=TokenVerifier) + verifier.required_scopes = ["read", "write"] + verifier.verify_token = AsyncMock(return_value=None) + return verifier + + @pytest.fixture + def proxy(self, jwt_verifier): + proxy = KeycloakOAuthProxy( + realm_url="https://keycloak.example.com/realms/test", + upstream_client_id="test-client", + upstream_client_secret="test-secret", + token_verifier=jwt_verifier, + base_url="https://proxy.example.com", + jwt_signing_key="test-secret-key", + client_storage=MemoryStore(), + ) + proxy.set_mcp_path("/mcp") + return proxy + async def test_refresh_expires_in_zero_issues_refresh_token(self, proxy): """refresh_expires_in=0 should fall back to the configured default. @@ -879,6 +911,306 @@ async def test_refresh_expires_in_zero_issues_refresh_token(self, proxy): ) assert refresh_meta is not None + # Upstream token set should be marked as never-expiring + jti_mapping = await proxy._jti_mapping_store.get( + key=proxy.jwt_issuer.verify_token( + result.refresh_token, expected_token_use="refresh" + )["jti"] + ) + assert jti_mapping is not None + upstream_token_set = await proxy._upstream_token_store.get( + key=jti_mapping.upstream_token_id + ) + assert upstream_token_set is not None + assert upstream_token_set.refresh_token_never_expires is True + assert upstream_token_set.refresh_token_expires_at is None + + def test_valid_scopes_forwarded_for_dcr(self, jwt_verifier): + """valid_scopes must be advertised/accepted for DCR independently of required_scopes. + + Offline tokens (the whole point of this proxy) require clients to register + the `offline_access` scope. Without a separate valid_scopes pass-through, + DCR scope validation falls back to required_scopes and rejects it. + """ + proxy = KeycloakOAuthProxy( + realm_url="https://keycloak.example.com/realms/test", + upstream_client_id="test-client", + upstream_client_secret="test-secret", + token_verifier=jwt_verifier, + valid_scopes=["openid", "offline_access"], + base_url="https://proxy.example.com", + jwt_signing_key="test-secret-key", + client_storage=MemoryStore(), + ) + assert proxy.client_registration_options is not None + assert proxy.client_registration_options.valid_scopes == [ + "openid", + "offline_access", + ] + # required_scopes (token verification) stays independent of valid_scopes (DCR) + assert jwt_verifier.required_scopes == ["read", "write"] + + def test_valid_scopes_accepts_string(self, jwt_verifier): + """A space/comma-delimited string is parsed like required_scopes.""" + proxy = KeycloakOAuthProxy( + realm_url="https://keycloak.example.com/realms/test", + upstream_client_id="test-client", + upstream_client_secret="test-secret", + token_verifier=jwt_verifier, + valid_scopes="openid offline_access", + base_url="https://proxy.example.com", + jwt_signing_key="test-secret-key", + client_storage=MemoryStore(), + ) + assert proxy.client_registration_options is not None + assert proxy.client_registration_options.valid_scopes == [ + "openid", + "offline_access", + ] + + def test_redirect_path_defaults_to_auth_callback(self, jwt_verifier): + proxy = KeycloakOAuthProxy( + realm_url="https://keycloak.example.com/realms/test", + upstream_client_id="test-client", + upstream_client_secret="test-secret", + token_verifier=jwt_verifier, + base_url="https://proxy.example.com", + jwt_signing_key="test-secret-key", + client_storage=MemoryStore(), + ) + assert proxy._redirect_path == "/auth/callback" + + def test_redirect_path_forwarded_for_custom_callback(self, jwt_verifier): + """Migrating a deployment whose Keycloak client uses a non-default + callback path must not silently revert to /auth/callback.""" + proxy = KeycloakOAuthProxy( + realm_url="https://keycloak.example.com/realms/test", + upstream_client_id="test-client", + upstream_client_secret="test-secret", + token_verifier=jwt_verifier, + base_url="https://proxy.example.com", + redirect_path="/custom/oauth/callback", + jwt_signing_key="test-secret-key", + client_storage=MemoryStore(), + ) + assert proxy._redirect_path == "/custom/oauth/callback" + + def test_issuer_and_resource_base_url_default_to_base_url(self, jwt_verifier): + proxy = KeycloakOAuthProxy( + realm_url="https://keycloak.example.com/realms/test", + upstream_client_id="test-client", + upstream_client_secret="test-secret", + token_verifier=jwt_verifier, + base_url="https://proxy.example.com", + jwt_signing_key="test-secret-key", + client_storage=MemoryStore(), + ) + assert str(proxy.issuer_url).rstrip("/") == "https://proxy.example.com" + assert proxy.resource_base_url is None + + def test_issuer_and_resource_base_url_forwarded(self, jwt_verifier): + """Behind a gateway, switching to KeycloakOAuthProxy must not drop the + issuer/resource overrides that OAuthProxy supports (correct metadata + and JWT audience).""" + proxy = KeycloakOAuthProxy( + realm_url="https://keycloak.example.com/realms/test", + upstream_client_id="test-client", + upstream_client_secret="test-secret", + token_verifier=jwt_verifier, + base_url="https://internal.proxy.local", + issuer_url="https://public.gateway.example.com", + resource_base_url="https://public.gateway.example.com/mcp", + jwt_signing_key="test-secret-key", + client_storage=MemoryStore(), + ) + assert str(proxy.issuer_url).rstrip("/") == "https://public.gateway.example.com" + assert proxy.resource_base_url is not None + assert ( + str(proxy.resource_base_url).rstrip("/") + == "https://public.gateway.example.com/mcp" + ) + + async def test_refresh_expires_in_zero_subsequent_refresh_does_not_shrink( + self, proxy + ): + """Repeated refresh cycles with refresh_expires_in=0 must not shrink the RT TTL. + + Keycloak offline tokens return refresh_expires_in=0 on every token response. + The first exchange correctly stores a 1-year FastMCP RT. Subsequent + exchange_refresh_token calls (triggered by the MCP client refreshing its tokens) + must each issue a new FastMCP RT with the same full fallback TTL — not a + gradually decaying value computed from the original wall-clock timestamp. + """ + import jwt as pyjwt + + client = OAuthClientInformationFull( + client_id="kc-shrink-client", + client_secret="test-secret", + redirect_uris=[AnyUrl("http://localhost:12345/callback")], + ) + await proxy.register_client(client) + + # ── step 1: initial authorization code exchange ────────────────────── + client_code = ClientCode( + code="kc-offline-code", + client_id="kc-shrink-client", + redirect_uri="http://localhost:12345/callback", + code_challenge="test-challenge", + code_challenge_method="S256", + scopes=["read"], + idp_tokens={ + "access_token": "upstream-at-v1", + "refresh_token": "upstream-rt-v1", + "expires_in": 3600, + "refresh_expires_in": 0, # Keycloak offline sentinel + "token_type": "Bearer", + }, + expires_at=time.time() + 300, + created_at=time.time(), + ) + await proxy._code_store.put(key=client_code.code, value=client_code) + + auth_code = AuthorizationCode( + code="kc-offline-code", + scopes=["read"], + expires_at=time.time() + 300, + client_id="kc-shrink-client", + code_challenge="test-challenge", + redirect_uri=AnyUrl("http://localhost:12345/callback"), + redirect_uri_provided_explicitly=True, + ) + result1 = await proxy.exchange_authorization_code( + client=client, + authorization_code=auth_code, + ) + assert result1.refresh_token is not None + first_rt = result1.refresh_token + first_payload = pyjwt.decode( + first_rt, options={"verify_signature": False, "verify_exp": False} + ) + first_ttl = first_payload["exp"] - first_payload["iat"] + assert first_ttl > 60 * 60 * 24 * 300, ( + f"First RT TTL {first_ttl}s should be close to 1 year" + ) + + # ── step 2: simulate a later refresh cycle (Keycloak returns 0 again) ─ + async def fake_refresh(url, refresh_token, scope=None, **_kwargs): + return { + "access_token": "upstream-at-v2", + "refresh_token": "upstream-rt-v2", + "expires_in": 3600, + "refresh_expires_in": 0, # same Keycloak sentinel + "token_type": "Bearer", + } + + mock_client = Mock() + mock_client.refresh_token = AsyncMock(side_effect=fake_refresh) + with patch.object( + proxy, "_create_upstream_oauth_client", return_value=mock_client + ): + first_rt_meta = await proxy._refresh_token_store.get( + key=_hash_token(first_rt) + ) + assert first_rt_meta is not None + result2 = await proxy.exchange_refresh_token( + client=client, + refresh_token=RefreshToken( + token=first_rt, + client_id="kc-shrink-client", + scopes=["read"], + expires_at=first_rt_meta.expires_at, + ), + scopes=["read"], + ) + + assert result2.refresh_token is not None + second_payload = pyjwt.decode( + result2.refresh_token, + options={"verify_signature": False, "verify_exp": False}, + ) + second_ttl = second_payload["exp"] - second_payload["iat"] + assert second_ttl > 60 * 60 * 24 * 300, ( + f"Refreshed RT TTL {second_ttl}s shrank — expected close to 1 year" + ) + + async def test_base_proxy_does_not_treat_zero_as_never_expires(self): + """Base OAuthProxy must not interpret refresh_expires_in=0 as never-expires. + + Only KeycloakOAuthProxy opts in to that behaviour. The base proxy should + fall through to the standard fallback (1-year wall-clock timestamp), preserving + existing behaviour for all other providers. + """ + verifier = Mock(spec=TokenVerifier) + verifier.required_scopes = ["read"] + verifier.verify_token = AsyncMock(return_value=None) + + base_proxy = OAuthProxy( + upstream_authorization_endpoint="https://idp.example.com/authorize", + upstream_token_endpoint="https://idp.example.com/token", + upstream_client_id="test-client", + upstream_client_secret="test-secret", + token_verifier=verifier, + base_url="https://proxy.example.com", + jwt_signing_key="test-secret-key", + client_storage=MemoryStore(), + ) + base_proxy.set_mcp_path("/mcp") + + client = OAuthClientInformationFull( + client_id="test-client", + client_secret="test-secret", + redirect_uris=[AnyUrl("http://localhost:12345/callback")], + ) + await base_proxy.register_client(client) + + client_code = ClientCode( + code="base-zero-code", + client_id="test-client", + redirect_uri="http://localhost:12345/callback", + code_challenge="test-challenge", + code_challenge_method="S256", + scopes=["read"], + idp_tokens={ + "access_token": "upstream-at", + "refresh_token": "upstream-rt", + "expires_in": 3600, + "refresh_expires_in": 0, + "token_type": "Bearer", + }, + expires_at=time.time() + 300, + created_at=time.time(), + ) + await base_proxy._code_store.put(key=client_code.code, value=client_code) + + auth_code = AuthorizationCode( + code="base-zero-code", + scopes=["read"], + expires_at=time.time() + 300, + client_id="test-client", + code_challenge="test-challenge", + redirect_uri=AnyUrl("http://localhost:12345/callback"), + redirect_uri_provided_explicitly=True, + ) + result = await base_proxy.exchange_authorization_code( + client=client, + authorization_code=auth_code, + ) + assert result.refresh_token is not None + + jti_mapping = await base_proxy._jti_mapping_store.get( + key=base_proxy.jwt_issuer.verify_token( + result.refresh_token, expected_token_use="refresh" + )["jti"] + ) + assert jti_mapping is not None + upstream_token_set = await base_proxy._upstream_token_store.get( + key=jti_mapping.upstream_token_id + ) + assert upstream_token_set is not None + # Base proxy: never_expires stays False, expires_at is set (wall-clock fallback) + assert upstream_token_set.refresh_token_never_expires is False + assert upstream_token_set.refresh_token_expires_at is not None + class TestTransparentUpstreamRefresh: """Tests for transparent upstream token refresh in load_access_token. @@ -1269,3 +1601,162 @@ async def test_upstream_claims_not_mutated_on_cached_token( returned = await mock_verifier.verify_token(call.args[0]) if returned: assert "upstream_claims" not in returned.claims + + +class TestKeycloakTransparentRefreshZero: + """The transparent-refresh path (load_access_token) must preserve the + Keycloak never-expires sentinel when a rotated offline token comes back + with refresh_expires_in=0. + + This is the third token path (alongside auth-code exchange and explicit + refresh-token exchange) and was previously only covered indirectly. + """ + + @pytest.fixture + def mock_verifier(self): + verifier = Mock(spec=TokenVerifier) + verifier.required_scopes = ["read"] + + async def verify(token: str) -> AccessToken | None: + if token.startswith("refreshed-") or token.startswith("valid-"): + return AccessToken( + token=token, + client_id="test-client", + scopes=["read"], + expires_at=int(time.time() + 3600), + ) + return None + + verifier.verify_token = AsyncMock(side_effect=verify) + return verifier + + @pytest.fixture + def proxy(self, mock_verifier): + proxy = KeycloakOAuthProxy( + realm_url="https://keycloak.example.com/realms/test", + upstream_client_id="test-client", + upstream_client_secret="test-secret", + token_verifier=mock_verifier, + base_url="https://proxy.example.com", + jwt_signing_key="test-secret-key", + client_storage=MemoryStore(), + ) + proxy.set_mcp_path("/mcp") + return proxy + + async def _setup_expired_session(self, proxy) -> str: + upstream_token_id = "upstream-tok-id" + access_jti = "test-access-jti" + await proxy._upstream_token_store.put( + key=upstream_token_id, + value=UpstreamTokenSet( + upstream_token_id=upstream_token_id, + access_token="expired-upstream-access", + refresh_token="upstream-refresh-tok", + refresh_token_expires_at=time.time() + 86400, + expires_at=time.time() - 60, + token_type="Bearer", + scope="read", + client_id="test-client", + created_at=time.time() - 3600, + ), + ttl=86400, + ) + await proxy._jti_mapping_store.put( + key=access_jti, + value=JTIMapping( + jti=access_jti, + upstream_token_id=upstream_token_id, + created_at=time.time(), + ), + ttl=3600, + ) + return proxy.jwt_issuer.issue_access_token( + client_id="test-client", + scopes=["read"], + jti=access_jti, + expires_in=3600, + ) + + async def test_transparent_refresh_zero_preserves_never_expires(self, proxy): + fastmcp_jwt = await self._setup_expired_session(proxy) + + mock_oauth_client = AsyncMock() + mock_oauth_client.refresh_token = AsyncMock( + return_value={ + "access_token": "refreshed-upstream-access", + "token_type": "Bearer", + "expires_in": 3600, + "refresh_token": "rotated-upstream-refresh", + "refresh_expires_in": 0, # Keycloak offline sentinel + "scope": "read", + } + ) + + with patch.object( + proxy, "_create_upstream_oauth_client", return_value=mock_oauth_client + ): + result = await proxy.load_access_token(fastmcp_jwt) + + assert result is not None + assert result.token == "refreshed-upstream-access" + + stored = await proxy._upstream_token_store.get(key="upstream-tok-id") + assert stored is not None + assert stored.refresh_token == "rotated-upstream-refresh" + # The sentinel must be preserved, not reset to a wall-clock deadline. + assert stored.refresh_token_never_expires is True + assert stored.refresh_token_expires_at is None + + async def test_base_proxy_transparent_refresh_zero_is_not_never_expires(self): + """The base OAuthProxy must NOT treat a transparent-refresh + refresh_expires_in=0 as never-expires (opt-in only).""" + verifier = Mock(spec=TokenVerifier) + verifier.required_scopes = ["read"] + + async def verify(token: str) -> AccessToken | None: + if token.startswith("refreshed-"): + return AccessToken( + token=token, + client_id="test-client", + scopes=["read"], + expires_at=int(time.time() + 3600), + ) + return None + + verifier.verify_token = AsyncMock(side_effect=verify) + proxy = OAuthProxy( + upstream_authorization_endpoint="https://idp.example.com/authorize", + upstream_token_endpoint="https://idp.example.com/token", + upstream_client_id="test-client", + upstream_client_secret="test-secret", + token_verifier=verifier, + base_url="https://proxy.example.com", + jwt_signing_key="test-secret-key", + client_storage=MemoryStore(), + ) + proxy.set_mcp_path("/mcp") + fastmcp_jwt = await TestKeycloakTransparentRefreshZero._setup_expired_session( + self, proxy + ) + + mock_oauth_client = AsyncMock() + mock_oauth_client.refresh_token = AsyncMock( + return_value={ + "access_token": "refreshed-upstream-access", + "token_type": "Bearer", + "expires_in": 3600, + "refresh_token": "rotated-upstream-refresh", + "refresh_expires_in": 0, + "scope": "read", + } + ) + with patch.object( + proxy, "_create_upstream_oauth_client", return_value=mock_oauth_client + ): + result = await proxy.load_access_token(fastmcp_jwt) + + assert result is not None + stored = await proxy._upstream_token_store.get(key="upstream-tok-id") + assert stored is not None + assert stored.refresh_token_never_expires is False