diff --git a/docs/docs.json b/docs/docs.json index de206e6c0..7528b6fb9 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -200,6 +200,7 @@ "integrations/github", "integrations/scalekit", "integrations/google", + "integrations/keycloak", "integrations/workos" ] }, diff --git a/docs/integrations/keycloak.mdx b/docs/integrations/keycloak.mdx new file mode 100644 index 000000000..d5425522d --- /dev/null +++ b/docs/integrations/keycloak.mdx @@ -0,0 +1,293 @@ +--- +title: Keycloak OAuth ๐Ÿค FastMCP +sidebarTitle: Keycloak +description: Secure your FastMCP server with Keycloak OAuth +icon: shield-check +tag: NEW +--- + +import { VersionBadge } from "/snippets/version-badge.mdx" + + + +This guide shows you how to secure your FastMCP server using **Keycloak OAuth**. This integration uses the [**Remote OAuth**](/servers/auth/remote-oauth) pattern with Dynamic Client Registration (DCR), where Keycloak handles user login and your FastMCP server validates the tokens. + +## Configuration + +### Prerequisites + +Before you begin, you will need: +1. A **[Keycloak](https://keycloak.org/)** server instance running (can be localhost for development, e.g., `http://localhost:8080`) + + +To spin up Keycloak instantly on your local machine, use Docker: + +```bash +docker run --rm \ + --name keycloak-fastmcp \ + -p 8080:8080 \ + -e KC_BOOTSTRAP_ADMIN_USERNAME=admin \ + -e KC_BOOTSTRAP_ADMIN_PASSWORD=admin123 \ + quay.io/keycloak/keycloak:26.3 \ + start-dev +``` + +Then access the admin console at `http://localhost:8080` with username `admin` and password `admin123`. + + + +If you prefer using Docker Compose instead, you may want to have a look at the [`docker-compose.yaml`](https://github.com/jlowin/fastmcp/blob/main/examples/auth/keycloak_auth/keycloak/docker-compose.yml) file included in the Keycloak auth example. + + +2. Administrative access to create and configure a Keycloak realm +3. Your FastMCP server's URL (can be localhost for development, e.g., `http://localhost:8000`) + +### Step 1: Configure Keycloak for Dynamic Client Registration (DCR) + + + + Before importing, you should review and customize the pre-configured realm file: + + 1. Download the FastMCP Keycloak realm configuration: [`realm-fastmcp.json`](https://github.com/jlowin/fastmcp/blob/main/examples/auth/keycloak_auth/keycloak/realm-fastmcp.json) + 2. Open the file in a text editor and customize as needed: + - **Realm name and display name**: Change `"realm": "fastmcp"` and `"displayName": "FastMCP Realm"` to match your project + - **Trusted hosts configuration**: Look for `"trusted-hosts"` section and update IP addresses if needed + - `localhost`: For local development + - `172.17.0.1`: Docker network gateway IP address (required when Keycloak is run with Docker and MCP server directly on localhost) + - `172.18.0.1`: Docker Compose network gateway IP address (required when Keycloak is run with Docker Compose and MCP server directly on localhost) + - For production, replace these with your actual domain names + 3. **Review the test user**: The file includes a test user (`testuser` with password `password123`). You may want to: + - Change the credentials for security + - Replace with more meaningful user accounts + - Or remove and create users later through the admin interface + + + **Production Security**: Always review and customize the configuration before importing, especially realm names, trusted hosts, and user credentials. + + + + + + The following instructions are based on **Keycloak 26.3**. Menu items, tabs, and interface elements may be slightly different in other Keycloak versions, but the core configuration concepts remain the same. + + + 1. In the left-side navigation, click **Manage realms** (if not visible, click the hamburger menu (โ˜ฐ) in the top-left corner to expand the navigation) + 2. Click **Create realm** + 3. In the "Create realm" dialog: + - Drag your `realm-fastmcp.json` file into the **Resource file** box (or use the "Browse" button to find and select it) + - Keycloak will automatically read the realm name (`fastmcp`) from the file + - Click the **Create** button + + That's it! This single action will create the `fastmcp` realm and instantly configure everything from the file: + - The realm settings (including user registration policies) + - The test user with their credentials + - All the necessary Client Policies and Client Profiles required to support Dynamic Client Registration (DCR) + - Trusted hosts configuration for secure client registration + + + You may see this warning in the Keycloak logs during import: + ``` + Failed to deserialize client policies in the realm fastmcp.Fallback to return empty profiles. + Details: Unrecognized field "profiles" (class org.keycloak.representations.idm.ClientPoliciesRepresentation), + not marked as ignorable (2 known properties: "policies","globalPolicies"]) + ``` + This is due to Keycloak's buggy/strict parser not recognizing valid older JSON formats but doesn't seem to impact functionality and can be safely ignored. + + + + + After import, verify your realm is properly configured: + + 1. **Check the realm URL**: `http://localhost:8080/realms/fastmcp` + 2. **Verify DCR policies**: Navigate to **Clients** โ†’ **Client registration** to see the imported `"Trusted Hosts"` policy with the trusted hosts you have configured earlier + 3. **Test user access**: The imported test user can be used for initial testing + + + Your realm is now ready for FastMCP integration with Dynamic Client Registration fully configured! + + + + +### Step 2: FastMCP Configuration + + +**Security Best Practice**: Always configure the `audience` parameter in production environments. Without audience validation, your server will accept tokens issued for *any* audience, including tokens meant for completely different services. Set `audience` to your resource server identifier (typically your server's base URL) to ensure tokens are specifically intended for your server. + + +Create your FastMCP server file and use the KeycloakAuthProvider to handle all the OAuth integration automatically: + +```python server.py +from fastmcp import FastMCP +from fastmcp.server.auth.providers.keycloak import KeycloakAuthProvider +from fastmcp.server.dependencies import get_access_token + +# The KeycloakAuthProvider automatically discovers Keycloak endpoints +# and configures JWT token validation +auth_provider = KeycloakAuthProvider( + realm_url="http://localhost:8080/realms/fastmcp", # Your Keycloak realm URL + base_url="http://localhost:8000", # Your server's public URL + required_scopes=["openid", "profile"], # Required OAuth scopes + audience="http://localhost:8000", # Recommended: validate token audience +) + +# Create FastMCP server with auth +mcp = FastMCP(name="My Keycloak Protected Server", auth=auth_provider) + +@mcp.tool +async def get_access_token_claims() -> dict: + """Get the authenticated user's access token claims.""" + token = get_access_token() + return { + "sub": token.claims.get("sub"), + "name": token.claims.get("name"), + "preferred_username": token.claims.get("preferred_username"), + "scope": token.claims.get("scope") + } +``` + +## Testing + +To test your server, you can use the `fastmcp` CLI to run it locally. Assuming you've saved the above code to `server.py` (after replacing the realm URL and base URL with your actual values!), you can run the following command: + +```bash +fastmcp run server.py --transport http --port 8000 +``` + +Now, you can use a FastMCP client to test that you can reach your server after authenticating: + +```python +import asyncio +from fastmcp import Client + +async def main(): + async with Client("http://localhost:8000/mcp/", auth="oauth") as client: + # First-time connection will open Keycloak login in your browser + print("โœ“ Authenticated with Keycloak!") + + # Test the protected tool + result = await client.call_tool("get_access_token_claims") + print(f"User: {result['preferred_username']}") + +if __name__ == "__main__": + asyncio.run(main()) +``` + +When you run the client for the first time: +1. Your browser will open to Keycloak's authorization page +2. After you log in and authorize the app, you'll be redirected back +3. The client receives the token and can make authenticated requests + + +The client caches tokens locally, so you won't need to re-authenticate for subsequent runs unless the token expires or you explicitly clear the cache. + + +### Troubleshooting: "Client not found" Error + + +If you restart Keycloak or change the realm configuration, you may end up seeing Keycloak showing a "Client not found" error instead of the login screen when running your client. This happens because FastMCP uses Dynamic Client Registration (DCR) and the client ID that was cached locally no longer exists on the Keycloak server. + +**Keycloak error**: "We are sorry... Client not found." + +**Solution**: Clear the local OAuth cache to force re-registration with Keycloak: + +```python +from fastmcp.client.auth.oauth import FileTokenStorage + +# Clear OAuth cache for your specific MCP server +storage = FileTokenStorage("http://localhost:8000/mcp/") # Use your MCP server URL +storage.clear() + +# Or clear all OAuth cache data for all MCP servers +FileTokenStorage.clear_all() +``` + +After clearing the cache, run your client again. It will automatically re-register with Keycloak and obtain new credentials. + + +## Environment Variables + +For production deployments, use environment variables instead of hardcoding credentials. + +### Provider Selection + +Setting this environment variable allows the Keycloak provider to be used automatically without explicitly instantiating it in code. + + + +Set to `fastmcp.server.auth.providers.keycloak.KeycloakAuthProvider` to use Keycloak authentication. + + + +### Keycloak-Specific Configuration + +These environment variables provide default values for the Keycloak provider, whether it's instantiated manually or configured via `FASTMCP_SERVER_AUTH`. + + + +Your Keycloak realm URL (e.g., `http://localhost:8080/realms/fastmcp` or `https://keycloak.example.com/realms/myrealm`) + + + +Public URL of your FastMCP server (e.g., `https://your-server.com` or `http://localhost:8000` for development) + + + +Comma-, space-, or JSON-separated list of required OAuth scopes (e.g., `openid profile` or `["openid","profile","email"]`) + + + +Audience(s) for JWT token validation. For production deployments, set this to your resource server identifier (typically your server's base URL) to ensure tokens are intended for your server. Without this, tokens issued for any audience will be accepted, which is a security risk. + + + +Example `.env` file: +```bash +# Use the Keycloak provider +FASTMCP_SERVER_AUTH=fastmcp.server.auth.providers.keycloak.KeycloakAuthProvider + +# Keycloak configuration +FASTMCP_SERVER_AUTH_KEYCLOAK_REALM_URL=http://localhost:8080/realms/fastmcp +FASTMCP_SERVER_AUTH_KEYCLOAK_BASE_URL=https://your-server.com +FASTMCP_SERVER_AUTH_KEYCLOAK_REQUIRED_SCOPES=openid,profile,email +FASTMCP_SERVER_AUTH_KEYCLOAK_AUDIENCE=https://your-server.com # Recommended for production +``` + +With environment variables set, your server code simplifies to: + +```python server.py +from fastmcp import FastMCP + +# Authentication is automatically configured from environment +mcp = FastMCP(name="My Keycloak Protected Server") + +@mcp.tool +async def protected_operation() -> str: + """Perform a protected operation.""" + # Your tool implementation here + return "Operation completed successfully" +``` + +## Advanced Configuration + +### Custom Token Verifier + +For advanced use cases, you can provide a custom token verifier: + +```python +from fastmcp.server.auth.providers.jwt import JWTVerifier +from fastmcp.server.auth.providers.keycloak import KeycloakAuthProvider + +# Custom JWT verifier with specific audience +custom_verifier = JWTVerifier( + jwks_uri="http://localhost:8080/realms/fastmcp/.well-known/jwks.json", + issuer="http://localhost:8080/realms/fastmcp", + audience="my-specific-client", + required_scopes=["api:read", "api:write"] +) + +auth_provider = KeycloakAuthProvider( + realm_url="http://localhost:8080/realms/fastmcp", + base_url="http://localhost:8000", + token_verifier=custom_verifier +) +``` diff --git a/examples/auth/keycloak_auth/.env.example b/examples/auth/keycloak_auth/.env.example new file mode 100644 index 000000000..7a4aa4fe1 --- /dev/null +++ b/examples/auth/keycloak_auth/.env.example @@ -0,0 +1,10 @@ +# Keycloak Configuration +FASTMCP_SERVER_AUTH_KEYCLOAK_BASE_URL=http://localhost:8000 +FASTMCP_SERVER_AUTH_KEYCLOAK_REALM_URL=http://localhost:8080/realms/fastmcp + +# Optional: Specific scopes +FASTMCP_SERVER_AUTH_KEYCLOAK_REQUIRED_SCOPES=openid,profile + +# Optional: Audience validation (recommended for production) +# If not set, defaults to base_url +# FASTMCP_SERVER_AUTH_KEYCLOAK_AUDIENCE=http://localhost:8000 diff --git a/examples/auth/keycloak_auth/README.md b/examples/auth/keycloak_auth/README.md new file mode 100644 index 000000000..2dfd38468 --- /dev/null +++ b/examples/auth/keycloak_auth/README.md @@ -0,0 +1,57 @@ +# Keycloak OAuth Example + +Demonstrates FastMCP server protection with Keycloak OAuth. + +## Setup + +### 1. Prepare the Realm Configuration + +Review the realm configuration file: [`keycloak/realm-fastmcp.json`](keycloak/realm-fastmcp.json) + +**Optional**: Customize the file for your environment: +- **Realm name**: Change `"realm": "fastmcp"` to match your project +- **Trusted hosts**: Update the `"trusted-hosts"` section for your environment +- **Test user**: Review credentials (`testuser` / `password123`) and change for security + +### 2. Set Up Keycloak + +Choose one of the following options: + +#### Option A: Local Keycloak Instance (Recommended for Testing) + +See [keycloak/README.md](keycloak/README.md) for details. + +**Note:** The realm will be automatically imported on startup. + +#### Option B: Existing Keycloak Instance + +Manually import the realm: +- Log in to your Keycloak Admin Console +- Click **Manage realms** โ†’ **Create realm** +- Drag the `realm-fastmcp.json` file into the **Resource file** box +- Click **Create** + +### 3. Run the Example + +1. Set environment variables: + + ```bash + export FASTMCP_SERVER_AUTH_KEYCLOAK_REALM_URL="http://localhost:8080/realms/fastmcp" + export FASTMCP_SERVER_AUTH_KEYCLOAK_BASE_URL="http://localhost:8000" + # Optional: Set audience for token validation (defaults to base_url if not set) + # export FASTMCP_SERVER_AUTH_KEYCLOAK_AUDIENCE="http://localhost:8000" + ``` + +2. Run the server: + + ```bash + python server.py + ``` + +3. In another terminal, run the client: + + ```bash + python client.py + ``` + +The client will open your browser for Keycloak authentication. diff --git a/examples/auth/keycloak_auth/client.py b/examples/auth/keycloak_auth/client.py new file mode 100644 index 000000000..ee3f484e4 --- /dev/null +++ b/examples/auth/keycloak_auth/client.py @@ -0,0 +1,60 @@ +"""OAuth client example for connecting to FastMCP servers. + +This example demonstrates how to connect to a Keycloak-protected FastMCP server. + +To run: + python client.py +""" + +import asyncio + +from fastmcp import Client +from fastmcp.client.auth.oauth import FileTokenStorage + +SERVER_URL = "http://localhost:8000/mcp" + +# Set to True to clear any previously stored tokens. +# This is useful if you have just restarted Keycloak and end up seeing +# Keycloak showing this error: "We are sorry... Client not found." +# instead of the login screen +CLEAR_TOKEN_CACHE = False + + +async def main(): + if CLEAR_TOKEN_CACHE: + storage = FileTokenStorage(f"{SERVER_URL.rstrip('/')}/") + storage.clear() + print("๐Ÿงน Cleared cached OAuth tokens.") + + try: + async with Client(SERVER_URL, auth="oauth") as client: + assert await client.ping() + print("โœ… Successfully authenticated!") + + tools = await client.list_tools() + print(f"๐Ÿ”ง Available tools ({len(tools)}):") + for tool in tools: + print(f" - {tool.name}: {tool.description}") + + # Test the protected tool + print("๐Ÿ”’ Calling protected tool: get_access_token_claims") + result = await client.call_tool("get_access_token_claims") + claims = result.data + print("๐Ÿ“„ Available access token claims:") + print(f" - sub: {claims.get('sub', 'N/A')}") + print(f" - name: {claims.get('name', 'N/A')}") + print(f" - given_name: {claims.get('given_name', 'N/A')}") + print(f" - family_name: {claims.get('family_name', 'N/A')}") + print(f" - preferred_username: {claims.get('preferred_username', 'N/A')}") + print(f" - scope: {claims.get('scope', [])}") + + except Exception as e: + print(f"โŒ Authentication failed: {e}") + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + # Graceful shutdown, suppress noisy logs resulting from asyncio.run task cancellation propagation + pass diff --git a/examples/auth/keycloak_auth/keycloak/README.md b/examples/auth/keycloak_auth/keycloak/README.md new file mode 100644 index 000000000..99016489d --- /dev/null +++ b/examples/auth/keycloak_auth/keycloak/README.md @@ -0,0 +1,131 @@ +# Local Keycloak Instance Setup + +This guide shows how to set up a local Keycloak instance for testing the FastMCP Keycloak OAuth example. + +## Quick Start + +**Prerequisites**: Docker and Docker Compose must be installed. + +Start the local Keycloak instance with Docker Compose: + +```bash +cd examples/auth/keycloak_auth/keycloak +./start-keycloak.sh +``` + +This script will: +- Start a Keycloak container on port 8080 +- Automatically import the preconfigured/customized `fastmcp` realm from [`realm-fastmcp.json`](realm-fastmcp.json) +- Create a test user (`testuser` / `password123`) + + +**Keycloak Admin Console**: [http://localhost:8080/admin](http://localhost:8080/admin) (admin / admin123) + +## Preconfigured Realm + +The Docker setup automatically imports a preconfigured realm configured for dynamic client registration. The default settings are described below and can be adjusted or complemented as needed by editing the [`realm-fastmcp.json`](realm-fastmcp.json) file before starting Keycloak. If settings are changed after Keycloak has been started, restart Keycloak with + +```bash +docker-compose restart +``` + +to apply the changes. + +### Realm: `fastmcp` + +The realm is configured with: + +- **Dynamic Client Registration** enabled for `http://localhost:8000/*` +- **Registration Allowed**: Yes +- **Allowed Client Scopes**: `openid`, `profile`, `email`, `roles`, `offline_access`, `web-origins`, `basic` +- **Trusted Hosts**: `localhost`, `172.17.0.1`, `172.18.0.1` + +### Test User + +The realm includes a test user: + +- **Username**: `testuser` +- **Password**: `password123` +- **Email**: `testuser@example.com` +- **First Name**: Test +- **Last Name**: User + +### Dynamic Client Registration + +The FastMCP server will automatically register a client with Keycloak on first run. The client registration policy ensures: + +- Client URIs must match `http://localhost:8000/*` +- Only allowed client scopes can be requested +- Client registration requests must come from trusted hosts + +### Token Claims + +Access tokens include standard OpenID Connect claims: +- `sub`: User identifier +- `preferred_username`: Username +- `email`: User email address +- `given_name`: First name +- `family_name`: Last name +- `realm_access`: Realm-level roles +- `resource_access`: Client-specific roles + +## Docker Configuration + +The setup uses the following Docker configuration: + +- **Container name**: `keycloak-fastmcp` +- **Port**: `8080` +- **Database**: H2 (in-memory, for development only) +- **Admin credentials**: `admin` / `admin123` +- **Realm import**: `realm-fastmcp.json` + +For production use, consider: +- Using a persistent database (PostgreSQL, MySQL) +- Configuring HTTPS +- Using proper admin credentials +- Enabling audit logging +- Restricting dynamic client registration or using pre-registered clients + +## Troubleshooting + +### View Keycloak Logs + +```bash +docker-compose logs -f keycloak +``` + +### Common Issues + +1. **Keycloak not starting** + - Check Docker is running: `docker ps` + - Check port 8080 is not in use: + - Linux/macOS: `netstat -an | grep 8080` or `lsof -i :8080` + - Windows: `netstat -an | findstr 8080` + +2. **Realm not found** + - Verify realm import: Check admin console at [http://localhost:8080/admin](http://localhost:8080/admin) + - Check realm file exists: + - Linux/macOS: `ls realm-fastmcp.json` + - Windows: `dir realm-fastmcp.json` + +3. **Client registration failed** + - Verify the request comes from a trusted host + - Check that redirect URIs match the allowed pattern (`http://localhost:8000/*`) + - Review client registration policies in the admin console + +4. **"Client not found" error after Keycloak restart** + - This happens because FastMCP uses Dynamic Client Registration (DCR) and the client ID that was cached locally no longer exists on the Keycloak server after restart + - **Solution**: Clear the local OAuth cache to force re-registration: + + ```python + from fastmcp.client.auth.oauth import FileTokenStorage + + # Clear OAuth cache for your specific MCP server + storage = FileTokenStorage("http://localhost:8000/mcp/") + storage.clear() + + # Or clear all OAuth cache data + FileTokenStorage.clear_all() + ``` + + - After clearing the cache, run your client again to automatically re-register with Keycloak diff --git a/examples/auth/keycloak_auth/keycloak/docker-compose.yml b/examples/auth/keycloak_auth/keycloak/docker-compose.yml new file mode 100644 index 000000000..ab3484b86 --- /dev/null +++ b/examples/auth/keycloak_auth/keycloak/docker-compose.yml @@ -0,0 +1,28 @@ +services: + keycloak: + image: quay.io/keycloak/keycloak:26.3 + container_name: keycloak-fastmcp + environment: + # Admin credentials + KC_BOOTSTRAP_ADMIN_USERNAME: admin + KC_BOOTSTRAP_ADMIN_PASSWORD: admin123 + + # Overwrite existing realm in database by re-importing realm from + # `./data/import/realm-export.json` upon every startup + KC_IMPORT_REALM_STRATEGY: OVERWRITE_EXISTING + ports: + - "8080:8080" + + command: + - start-dev + - --import-realm + + volumes: + - ./realm-fastmcp.json:/opt/keycloak/data/import/realm-export.json + + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8080/health/ready"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 60s diff --git a/examples/auth/keycloak_auth/keycloak/realm-fastmcp.json b/examples/auth/keycloak_auth/keycloak/realm-fastmcp.json new file mode 100644 index 000000000..102e37e89 --- /dev/null +++ b/examples/auth/keycloak_auth/keycloak/realm-fastmcp.json @@ -0,0 +1,95 @@ +{ + "realm": "fastmcp", + "displayName": "FastMCP Realm", + "enabled": true, + "keycloakVersion": "26.3.5", + "registrationAllowed": true, + "components": { + "org.keycloak.services.clientregistration.policy.ClientRegistrationPolicy": [ + { + "name": "Trusted Hosts", + "providerId": "trusted-hosts", + "subType": "anonymous", + "subComponents": {}, + "config": { + "host-sending-registration-request-must-match": [ + "true" + ], + "trusted-hosts": [ + "localhost", + "172.17.0.1", + "172.18.0.1" + ], + "client-uris-must-match": [ + "true" + ] + } + } + ] + }, + "users": [ + { + "username": "testuser", + "email": "testuser@example.com", + "firstName": "Test", + "lastName": "User", + "enabled": true, + "emailVerified": true, + "credentials": [ + { + "type": "password", + "value": "password123", + "temporary": false + } + ] + } + ], + "clientPolicies": { + "policies": [ + { + "name": "Allowed Client Scopes", + "enabled": true, + "conditions": [ + { + "condition": "client-scopes", + "configuration": { + "allowed-client-scopes": [ + "openid", + "profile", + "email", + "roles", + "offline_access", + "web-origins", + "basic" + ] + } + } + ] + }, + { + "name": "Allowed Client URIs", + "enabled": true, + "conditions": [ + { + "condition": "client-uris", + "configuration": { + "uris": [ + "http://localhost:8000/*" + ] + } + } + ] + } + ], + "profiles": [ + { + "name": "dynamic-client-registration-profile", + "to-clients-dynamically-registered": true, + "policies": [ + "Allowed Client URIs", + "Allowed Client Scopes" + ] + } + ] + } +} \ No newline at end of file diff --git a/examples/auth/keycloak_auth/keycloak/start-keycloak.sh b/examples/auth/keycloak_auth/keycloak/start-keycloak.sh new file mode 100644 index 000000000..1a97295d7 --- /dev/null +++ b/examples/auth/keycloak_auth/keycloak/start-keycloak.sh @@ -0,0 +1,63 @@ +#!/bin/bash + +# Keycloak Start Script +# Starts a local Keycloak instance with Docker Compose + +set -e + +echo "๐Ÿš€ Starting Keycloak..." + +# Check if Docker is running +if ! docker info > /dev/null 2>&1; then + echo "โŒ Docker is not running. Please start Docker first." + exit 1 +fi + +# Start Keycloak using docker-compose +echo "๐Ÿณ Starting Keycloak with docker-compose..." +docker-compose up -d + +# Wait for Keycloak to become ready +echo "โณ Waiting for Keycloak to become ready..." +echo "" + +timeout=120 +counter=0 + +while [ $counter -lt $timeout ]; do + if curl -s http://localhost:8080/health/ready > /dev/null 2>&1; then + echo "โœ… Keycloak is ready!" + break + fi + + # Show recent logs while waiting + echo " Still waiting... ($counter/$timeout seconds)" + echo " Recent logs:" + docker logs --tail 3 keycloak-fastmcp 2>/dev/null | sed 's/^/ /' || echo " (logs not available yet)" + echo "" + + sleep 5 + counter=$((counter + 5)) +done + +if [ $counter -ge $timeout ]; then + echo "โŒ Keycloak failed to get ready within $timeout seconds" + echo " Check logs with: docker logs -f keycloak-fastmcp" + exit 1 +fi + +echo "" +echo "๐ŸŽ‰ Keycloak is ready!" +echo "" +echo "Keycloak Admin Console: http://localhost:8080/admin" +echo " Username: admin" +echo " Password: admin123" +echo "" +echo "Test User Credentials:" +echo " Username: testuser" +echo " Password: password123" +echo "" +echo "Useful commands:" +echo " โ€ข Check Keycloak logs: docker logs -f keycloak-fastmcp" +echo " โ€ข Stop Keycloak: docker-compose down" +echo " โ€ข Restart Keycloak: docker-compose restart" \ No newline at end of file diff --git a/examples/auth/keycloak_auth/requirements.txt b/examples/auth/keycloak_auth/requirements.txt new file mode 100644 index 000000000..081c6d1da --- /dev/null +++ b/examples/auth/keycloak_auth/requirements.txt @@ -0,0 +1,2 @@ +fastmcp>=0.1.0 +python-dotenv>=1.0.0 \ No newline at end of file diff --git a/examples/auth/keycloak_auth/server.py b/examples/auth/keycloak_auth/server.py new file mode 100644 index 000000000..5f54ae2c3 --- /dev/null +++ b/examples/auth/keycloak_auth/server.py @@ -0,0 +1,82 @@ +"""Keycloak OAuth server example for FastMCP. + +This example demonstrates how to protect a FastMCP server with Keycloak. + +Required environment variables: +- FASTMCP_SERVER_AUTH_KEYCLOAK_REALM_URL: Your Keycloak realm URL +- FASTMCP_SERVER_AUTH_KEYCLOAK_BASE_URL: Your FastMCP server base URL + +Optional environment variables: +- FASTMCP_SERVER_AUTH_KEYCLOAK_REQUIRED_SCOPES: Required OAuth scopes (default: "openid,profile") +- FASTMCP_SERVER_AUTH_KEYCLOAK_AUDIENCE: Audience for JWT validation (default: base_url) + +To run: + python server.py +""" + +import os + +from dotenv import load_dotenv + +from fastmcp import FastMCP +from fastmcp.server.auth.providers.keycloak import KeycloakAuthProvider +from fastmcp.server.dependencies import get_access_token +from fastmcp.utilities.logging import configure_logging + +# Load environment overrides before configuring logging +load_dotenv(".env", override=True) + +# Configure FastMCP logging to INFO +configure_logging(level="INFO") + +realm_url = os.getenv( + "FASTMCP_SERVER_AUTH_KEYCLOAK_REALM_URL", "http://localhost:8080/realms/fastmcp" +) +base_url = os.getenv("FASTMCP_SERVER_AUTH_KEYCLOAK_BASE_URL", "http://localhost:8000") +required_scopes = os.getenv( + "FASTMCP_SERVER_AUTH_KEYCLOAK_REQUIRED_SCOPES", "openid,profile" +) +audience = os.getenv("FASTMCP_SERVER_AUTH_KEYCLOAK_AUDIENCE", base_url) + +auth = KeycloakAuthProvider( + realm_url=realm_url, + base_url=base_url, + required_scopes=required_scopes, + audience=audience, # Validate token audience for security +) + +mcp = FastMCP("Keycloak OAuth Example Server", auth=auth) + + +@mcp.tool +def echo(message: str) -> str: + """Echo the provided message.""" + return message + + +@mcp.tool +async def get_access_token_claims() -> dict: + """Get the authenticated user's access token claims.""" + token = get_access_token() + if token is None or token.claims is None: + raise RuntimeError("No valid access token found. Authentication required.") + + return { + "sub": token.claims.get("sub"), + "name": token.claims.get("name"), + "given_name": token.claims.get("given_name"), + "family_name": token.claims.get("family_name"), + "preferred_username": token.claims.get("preferred_username"), + "scope": token.claims.get("scope"), + } + + +if __name__ == "__main__": + try: + mcp.run(transport="http", port=8000) + except KeyboardInterrupt: + # Graceful shutdown, suppress noisy logs resulting from asyncio.run task cancellation propagation + pass + except Exception as e: + # Unexpected internal error + print(f"โŒ Internal error: {e}") diff --git a/src/fastmcp/server/auth/providers/keycloak.py b/src/fastmcp/server/auth/providers/keycloak.py new file mode 100644 index 000000000..840240073 --- /dev/null +++ b/src/fastmcp/server/auth/providers/keycloak.py @@ -0,0 +1,470 @@ +"""Keycloak authentication provider for FastMCP. + +This module provides KeycloakAuthProvider - a complete authentication solution that integrates +with Keycloak's OAuth 2.1 and OpenID Connect services, supporting Dynamic Client Registration (DCR) +for seamless MCP client authentication. +""" + +from __future__ import annotations + +import json +from urllib.parse import urlencode + +import httpx +from pydantic import AnyHttpUrl, field_validator +from pydantic_settings import BaseSettings, SettingsConfigDict +from starlette.responses import JSONResponse, RedirectResponse +from starlette.routing import Route + +from fastmcp.server.auth import RemoteAuthProvider, TokenVerifier +from fastmcp.server.auth.oidc_proxy import OIDCConfiguration +from fastmcp.server.auth.providers.jwt import JWTVerifier +from fastmcp.utilities.auth import parse_scopes +from fastmcp.utilities.logging import get_logger +from fastmcp.utilities.types import NotSet, NotSetT + +logger = get_logger(__name__) + + +class KeycloakProviderSettings(BaseSettings): + model_config = SettingsConfigDict( + env_prefix="FASTMCP_SERVER_AUTH_KEYCLOAK_", + env_file=".env", + extra="ignore", + ) + + realm_url: AnyHttpUrl + base_url: AnyHttpUrl + required_scopes: list[str] | None = None + audience: str | list[str] | None = None + + @field_validator("required_scopes", mode="before") + @classmethod + def _parse_scopes(cls, v): + return parse_scopes(v) + + +class KeycloakAuthProvider(RemoteAuthProvider): + """Keycloak metadata provider for DCR (Dynamic Client Registration). + + This provider implements Keycloak integration using metadata forwarding and + dynamic endpoint discovery. This is the recommended approach for Keycloak DCR + as it allows Keycloak to handle the OAuth flow directly while FastMCP acts + as a resource server. + + IMPORTANT SETUP REQUIREMENTS: + + 1. Enable Dynamic Client Registration in Keycloak Admin Console: + - Go to Realm Settings โ†’ Client Registration + - Enable "Anonymous" or "Authenticated" access for Dynamic Client Registration + - Configure Client Registration Policies as needed + + 2. Note your Realm URL: + - Example: https://keycloak.example.com/realms/myrealm + - This should be the full URL to your specific realm + + For detailed setup instructions, see: + https://www.keycloak.org/securing-apps/client-registration + + SECURITY NOTE: + By default, audience validation is disabled to support flexible Dynamic Client + Registration flows. For production deployments, it's strongly recommended to + configure the `audience` parameter to validate that tokens are intended for your + resource server. This prevents tokens issued for other services from being accepted. + + Examples: + ```python + from fastmcp import FastMCP + from fastmcp.server.auth.providers.keycloak import KeycloakAuthProvider + + # Method 1: Direct parameters (with audience validation for production) + keycloak_auth = KeycloakAuthProvider( + realm_url="https://keycloak.example.com/realms/myrealm", + base_url="https://your-fastmcp-server.com", + required_scopes=["openid", "profile"], + audience="https://your-fastmcp-server.com", # Recommended for production + ) + + # Method 2: Environment variables + # Set: FASTMCP_SERVER_AUTH_KEYCLOAK_REALM_URL=https://keycloak.example.com/realms/myrealm + # Set: FASTMCP_SERVER_AUTH_KEYCLOAK_BASE_URL=https://your-fastmcp-server.com + # Set: FASTMCP_SERVER_AUTH_KEYCLOAK_REQUIRED_SCOPES=openid,profile + # Set: FASTMCP_SERVER_AUTH_KEYCLOAK_AUDIENCE=https://your-fastmcp-server.com + keycloak_auth = KeycloakAuthProvider() + + # Method 3: Custom token verifier + from fastmcp.server.auth.providers.jwt import JWTVerifier + + custom_verifier = JWTVerifier( + jwks_uri="https://keycloak.example.com/realms/myrealm/.well-known/jwks.json", + issuer="https://keycloak.example.com/realms/myrealm", + audience="my-client-id", + required_scopes=["api:read", "api:write"] + ) + + keycloak_auth = KeycloakAuthProvider( + realm_url="https://keycloak.example.com/realms/myrealm", + base_url="https://your-fastmcp-server.com", + token_verifier=custom_verifier, + ) + + # Use with FastMCP + mcp = FastMCP("My App", auth=keycloak_auth) + ``` + """ + + def __init__( + self, + *, + realm_url: AnyHttpUrl | str | NotSetT = NotSet, + base_url: AnyHttpUrl | str | NotSetT = NotSet, + required_scopes: list[str] | None | NotSetT = NotSet, + audience: str | list[str] | None | NotSetT = NotSet, + token_verifier: TokenVerifier | None = None, + ): + """Initialize Keycloak metadata provider. + + Args: + realm_url: Your Keycloak realm URL (e.g., "https://keycloak.example.com/realms/myrealm") + base_url: Public URL of this FastMCP server + required_scopes: Optional list of scopes to require for all requests + audience: Optional audience(s) for JWT validation. If not specified and no custom + verifier is provided, audience validation is disabled. For production use, + it's recommended to set this to your resource server identifier or base_url. + token_verifier: Optional token verifier. If None, creates JWT verifier for Keycloak + """ + settings = KeycloakProviderSettings.model_validate( + { + k: v + for k, v in { + "realm_url": realm_url, + "base_url": base_url, + "required_scopes": required_scopes, + "audience": audience, + }.items() + if v is not NotSet + } + ) + + base_url = str(settings.base_url).rstrip("/") + self.realm_url = str(settings.realm_url).rstrip("/") + + # Discover OIDC configuration from Keycloak + self.oidc_config = self._discover_oidc_configuration() + + # Create default JWT verifier if none provided + if token_verifier is None: + # After discovery, jwks_uri and issuer are guaranteed non-None (defaults applied) + token_verifier = JWTVerifier( + jwks_uri=str(self.oidc_config.jwks_uri), + issuer=str(self.oidc_config.issuer), + algorithm="RS256", + required_scopes=settings.required_scopes, + audience=settings.audience, # Validate audience for security + ) + elif settings.required_scopes is not None: + # Merge provider-level required scopes into custom verifier + existing_scopes = list(token_verifier.required_scopes or []) + for scope in settings.required_scopes: + if scope not in existing_scopes: + existing_scopes.append(scope) + # Try to set merged scopes, but handle immutable verifiers gracefully + try: + token_verifier.required_scopes = existing_scopes + except (AttributeError, TypeError) as e: + logger.warning( + f"Cannot set required_scopes on custom verifier (immutable): {e}. " + "Provider-level scope requirements may not be enforced." + ) + + # Initialize RemoteAuthProvider with FastMCP as the authorization server proxy + super().__init__( + token_verifier=token_verifier, + authorization_servers=[AnyHttpUrl(base_url)], + base_url=base_url, + ) + + def _discover_oidc_configuration(self) -> OIDCConfiguration: + """Discover OIDC configuration from Keycloak with default value handling.""" + # Fetch original OIDC configuration from Keycloak + config_url = AnyHttpUrl(f"{self.realm_url}/.well-known/openid-configuration") + config = OIDCConfiguration.get_oidc_configuration( + config_url, strict=False, timeout_seconds=10 + ) + + # Apply default values for fields that might be missing + if not config.jwks_uri: + config.jwks_uri = f"{self.realm_url}/.well-known/jwks.json" + if not config.issuer: + config.issuer = self.realm_url + if not config.registration_endpoint: + config.registration_endpoint = ( + f"{self.realm_url}/clients-registrations/openid-connect" + ) + if not config.authorization_endpoint: + config.authorization_endpoint = ( + f"{self.realm_url}/protocol/openid-connect/auth" + ) + + return config + + def get_routes( + self, + mcp_path: str | None = None, + ) -> list[Route]: + """Get OAuth routes including authorization server metadata endpoint. + + This returns the standard protected resource routes plus an authorization server + metadata endpoint that allows OAuth clients to discover and participate in auth flows + with this MCP server acting as a proxy to Keycloak. + + The proxy is necessary to: + - Inject server-configured required scopes into client registration requests + - Modify client registration responses for FastMCP compatibility + - Inject server-configured required scopes into authorization requests + - Prevent CORS issues when FastMCP and Keycloak are on different origins + + Args: + mcp_path: The path where the MCP endpoint is mounted (e.g., "/mcp") + """ + # Get the standard protected resource routes from RemoteAuthProvider + routes = super().get_routes(mcp_path) + + async def oauth_authorization_server_metadata(request): + """Return OAuth authorization server metadata for this FastMCP authorization server proxy.""" + logger.debug("OAuth authorization server metadata endpoint called") + + # Create a copy of Keycloak OAuth metadata as starting point for the + # OAuth metadata of this FastMCP authorization server proxy + config = self.oidc_config.model_copy() + + # Add/modify registration and authorization endpoints to intercept + # Dynamic Client Registration (DCR) requests on this FastMCP authorization server proxy + base_url = str(self.base_url).rstrip("/") + config.registration_endpoint = f"{base_url}/register" + config.authorization_endpoint = f"{base_url}/authorize" + + # Return the OAuth metadata of this FastMCP authorization server proxy as JSON + metadata = config.model_dump(by_alias=True, exclude_none=True) + return JSONResponse(metadata) + + # Add authorization server metadata discovery endpoint + routes.append( + Route( + "/.well-known/oauth-authorization-server", + endpoint=oauth_authorization_server_metadata, + methods=["GET"], + ) + ) + + async def register_client_proxy(request): + """Proxy client registration to Keycloak with request and response modifications. + + This proxy modifies both the client registration request and response to ensure FastMCP + compatibility: + + Request modifications: + - Injects server-configured required scopes into the registration request to ensure the client + is granted the necessary scopes for token validation + + Response modifications: + - Changes token_endpoint_auth_method from 'client_secret_basic' to 'client_secret_post' + - Filters response_types to only include 'code' (removes 'none' and others) + + These modifications cannot be easily achieved through Keycloak server configuration + alone because: + - Scope assignment for dynamic clients can not be achieved the static configuration but + requires runtime injection + - Keycloak's default authentication flows advertise 'client_secret_basic' as token endpoint + authentication method globally and client-specific overrides would require pre-registration + or complex policies + - Response type filtering would require custom Keycloak extensions + """ + logger.debug("Client registration proxy endpoint called") + try: + # Get and parse the request body to retrieve client registration data + body = await request.body() + registration_data = json.loads(body) + logger.info( + f"Intercepting client registration request - redirect_uris: {registration_data.get('redirect_uris')}, scope: {registration_data.get('scope') or 'N/A'}" + ) + + # Add the server's required scopes to the client registration data + if self.token_verifier.required_scopes: + scopes = parse_scopes(registration_data.get("scope")) or [] + merged_scopes = scopes + [ + scope + for scope in self.token_verifier.required_scopes + if scope not in scopes + ] + logger.info( + f"Merging server-configured required scopes with client-requested scopes: {merged_scopes}" + ) + registration_data["scope"] = " ".join(merged_scopes) + # Update the body with modified client registration data + body = json.dumps(registration_data).encode("utf-8") + + # Forward the registration request to Keycloak + async with httpx.AsyncClient(timeout=10.0) as client: + logger.info( + f"Forwarding client registration to Keycloak: {self.oidc_config.registration_endpoint}" + ) + # Forward all headers except Host and hop-by-hop headers + # Exclude Content-Length so httpx can recompute it for the modified body + forward_headers = { + key: value + for key, value in request.headers.items() + if key.lower() + not in {"host", "content-length", "transfer-encoding"} + } + # Ensure Content-Type is set correctly for our JSON body + forward_headers["Content-Type"] = "application/json" + + response = await client.post( + str(self.oidc_config.registration_endpoint), + content=body, + headers=forward_headers, + ) + + # Read response body once and cache it to avoid double-decoding issues + response_body = response.content + + if response.status_code != 201: + error_detail = {"error": "registration_failed"} + try: + if response.headers.get("content-type", "").startswith( + "application/json" + ): + error_detail = json.loads(response_body) + else: + error_detail = { + "error": "registration_failed", + "error_description": response_body.decode("utf-8")[ + :500 + ] + if response_body + else f"HTTP {response.status_code}", + } + except Exception: + error_detail = { + "error": "registration_failed", + "error_description": f"HTTP {response.status_code}", + } + + return JSONResponse( + error_detail, + status_code=response.status_code, + ) + + # Modify the response to be compatible with FastMCP + logger.info( + "Modifying 'token_endpoint_auth_method' and 'response_types' in client info for FastMCP compatibility" + ) + client_info = json.loads(response_body) + + logger.debug( + f"Original client info from Keycloak: token_endpoint_auth_method={client_info.get('token_endpoint_auth_method')}, response_types={client_info.get('response_types')}, redirect_uris={client_info.get('redirect_uris')}" + ) + + # Fix token_endpoint_auth_method + client_info["token_endpoint_auth_method"] = "client_secret_post" + + # Fix response_types - ensure only "code" + if "response_types" in client_info: + client_info["response_types"] = ["code"] + + logger.debug( + f"Modified client info for FastMCP compatibility: token_endpoint_auth_method={client_info.get('token_endpoint_auth_method')}, response_types={client_info.get('response_types')}" + ) + + return JSONResponse(client_info, status_code=201) + + except Exception as e: + return JSONResponse( + { + "error": "server_error", + "error_description": f"Client registration failed: {e}", + }, + status_code=500, + ) + + # Add client registration proxy + routes.append( + Route( + "/register", + endpoint=register_client_proxy, + methods=["POST"], + ) + ) + + async def authorize_proxy(request): + """Proxy authorization requests to Keycloak with scope injection and CORS handling. + + This proxy is essential for scope management and CORS compatibility. It injects + server-configured required scopes into authorization requests, ensuring that OAuth + clients request the proper scopes even though they don't know what the server requires. + Additionally, it prevents CORS issues when FastMCP and Keycloak are on different origins. + + The proxy ensures: + - Injection of server-configured required scopes into the authorization request + - Compatibility with OAuth clients that expect same-origin authorization flows by letting authorization + requests stay on same origin as client registration requests + """ + logger.debug("Authorization proxy endpoint called") + try: + logger.info( + f"Intercepting authorization request - query_params: {request.query_params}" + ) + + # Add server-configured required scopes to the authorization request + # Use multi_items() to preserve duplicate query parameters (e.g., multiple 'resource' per RFC 8707) + query_items = list(request.query_params.multi_items()) + if self.token_verifier.required_scopes: + existing_scopes = ( + parse_scopes(request.query_params.get("scope")) or [] + ) + missing_scopes = [ + scope + for scope in self.token_verifier.required_scopes + if scope not in existing_scopes + ] + if missing_scopes: + logger.info( + f"Adding server-configured required scopes to authorization request: {missing_scopes}" + ) + scope_value = " ".join(existing_scopes + missing_scopes) + # Remove existing scope parameter and add the updated one + query_items = [(k, v) for k, v in query_items if k != "scope"] + query_items.append(("scope", scope_value)) + + # Build authorization request URL for redirecting to Keycloak and including the (potentially modified) query string + authorization_url = str(self.oidc_config.authorization_endpoint) + query_string = urlencode(query_items, doseq=True) + if query_string: + authorization_url += f"?{query_string}" + + # Redirect authorization request to Keycloak's authorization endpoint + logger.info( + f"Redirecting authorization request to Keycloak: {authorization_url}" + ) + return RedirectResponse(url=authorization_url, status_code=302) + + except Exception as e: + return JSONResponse( + { + "error": "server_error", + "error_description": f"Authorization request failed: {e}", + }, + status_code=500, + ) + + # Add authorization endpoint proxy + routes.append( + Route( + "/authorize", + endpoint=authorize_proxy, + methods=["GET"], + ) + ) + + return routes diff --git a/tests/integration_tests/auth/test_keycloak_provider_integration.py b/tests/integration_tests/auth/test_keycloak_provider_integration.py new file mode 100644 index 000000000..39ee9d7f0 --- /dev/null +++ b/tests/integration_tests/auth/test_keycloak_provider_integration.py @@ -0,0 +1,429 @@ +"""Integration tests for Keycloak OAuth provider.""" + +import asyncio +import os +from unittest.mock import AsyncMock, Mock, patch +from urllib.parse import parse_qs, urlparse + +import httpx +import pytest +from starlette.applications import Starlette +from starlette.responses import JSONResponse +from starlette.routing import Route + +from fastmcp import FastMCP +from fastmcp.server.auth.providers.keycloak import KeycloakAuthProvider + +TEST_REALM_URL = "https://keycloak.example.com/realms/test" +TEST_BASE_URL = "https://fastmcp.example.com" +TEST_REQUIRED_SCOPES = ["openid", "profile", "email"] + + +@pytest.fixture +def mock_keycloak_server(): + """Create a mock Keycloak server for integration testing.""" + + async def oidc_configuration(request): + """Mock OIDC configuration endpoint.""" + config = { + "issuer": TEST_REALM_URL, + "authorization_endpoint": f"{TEST_REALM_URL}/protocol/openid-connect/auth", + "token_endpoint": f"{TEST_REALM_URL}/protocol/openid-connect/token", + "jwks_uri": f"{TEST_REALM_URL}/.well-known/jwks.json", + "registration_endpoint": f"{TEST_REALM_URL}/clients-registrations/openid-connect", + "response_types_supported": ["code", "id_token", "token"], + "subject_types_supported": ["public"], + "id_token_signing_alg_values_supported": ["RS256"], + "scopes_supported": ["openid", "profile", "email"], + "grant_types_supported": ["authorization_code", "refresh_token"], + } + return JSONResponse(config) + + async def client_registration(request): + """Mock client registration endpoint.""" + body = await request.json() + client_info = { + "client_id": "keycloak-generated-client-id", + "client_secret": "keycloak-generated-client-secret", + "token_endpoint_auth_method": "client_secret_basic", # Keycloak default + "response_types": ["code", "none"], # Keycloak default + "redirect_uris": body.get("redirect_uris", []), + "scope": body.get("scope", "openid"), + "grant_types": ["authorization_code", "refresh_token"], + } + return JSONResponse(client_info, status_code=201) + + routes = [ + Route("/.well-known/openid-configuration", oidc_configuration, methods=["GET"]), + Route( + "/clients-registrations/openid-connect", + client_registration, + methods=["POST"], + ), + ] + + app = Starlette(routes=routes) + return app + + +class TestKeycloakProviderIntegration: + """Integration tests for KeycloakAuthProvider with mock Keycloak server.""" + + async def test_end_to_end_client_registration_flow(self, mock_keycloak_server): + """Test complete client registration flow with mock Keycloak.""" + # Mock the OIDC configuration request to the real Keycloak + with patch("httpx.get") as mock_get: + mock_response = Mock() + mock_response.json.return_value = { + "issuer": TEST_REALM_URL, + "authorization_endpoint": f"{TEST_REALM_URL}/protocol/openid-connect/auth", + "token_endpoint": f"{TEST_REALM_URL}/protocol/openid-connect/token", + "jwks_uri": f"{TEST_REALM_URL}/.well-known/jwks.json", + "registration_endpoint": f"{TEST_REALM_URL}/clients-registrations/openid-connect", + } + mock_response.raise_for_status.return_value = None + mock_get.return_value = mock_response + + # Create KeycloakAuthProvider + provider = KeycloakAuthProvider( + realm_url=TEST_REALM_URL, + base_url=TEST_BASE_URL, + required_scopes=TEST_REQUIRED_SCOPES, + ) + + # Create FastMCP app with the provider + mcp = FastMCP("test-server", auth=provider) + mcp_http_app = mcp.http_app() + + # Mock the actual HTTP client post method + with patch( + "fastmcp.server.auth.providers.keycloak.httpx.AsyncClient.post" + ) as mock_post: + # Mock Keycloak's response to client registration + mock_keycloak_response = Mock() + mock_keycloak_response.status_code = 201 + mock_keycloak_response.json.return_value = { + "client_id": "keycloak-generated-client-id", + "client_secret": "keycloak-generated-client-secret", + "token_endpoint_auth_method": "client_secret_basic", + "response_types": ["code", "none"], + "redirect_uris": ["http://localhost:8000/callback"], + } + mock_keycloak_response.headers = {"content-type": "application/json"} + mock_post.return_value = mock_keycloak_response + + # Test client registration through FastMCP proxy + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=mcp_http_app), + base_url=TEST_BASE_URL, + ) as client: + registration_data = { + "redirect_uris": ["http://localhost:8000/callback"], + "client_name": "test-mcp-client", + "client_uri": "http://localhost:8000", + } + + response = await client.post("/register", json=registration_data) + + # Verify the endpoint processed the request successfully + assert response.status_code == 201 + client_info = response.json() + assert "client_id" in client_info + assert "client_secret" in client_info + + # Verify the mock was called (meaning the proxy forwarded the request) + mock_post.assert_called_once() + + async def test_oauth_discovery_endpoints_integration(self): + """Test OAuth discovery endpoints work correctly together.""" + with patch("httpx.get") as mock_get: + mock_response = Mock() + mock_response.json.return_value = { + "issuer": TEST_REALM_URL, + "authorization_endpoint": f"{TEST_REALM_URL}/protocol/openid-connect/auth", + "token_endpoint": f"{TEST_REALM_URL}/protocol/openid-connect/token", + "jwks_uri": f"{TEST_REALM_URL}/.well-known/jwks.json", + "registration_endpoint": f"{TEST_REALM_URL}/clients-registrations/openid-connect", + } + mock_response.raise_for_status.return_value = None + mock_get.return_value = mock_response + + provider = KeycloakAuthProvider( + realm_url=TEST_REALM_URL, + base_url=TEST_BASE_URL, + required_scopes=TEST_REQUIRED_SCOPES, + ) + + mcp = FastMCP("test-server", auth=provider) + mcp_http_app = mcp.http_app() + + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=mcp_http_app), + base_url=TEST_BASE_URL, + ) as client: + # Test authorization server metadata + auth_server_response = await client.get( + "/.well-known/oauth-authorization-server" + ) + assert auth_server_response.status_code == 200 + auth_data = auth_server_response.json() + + # Test protected resource metadata + # Per RFC 9728, when the resource is at /mcp, the metadata endpoint is at /.well-known/oauth-protected-resource/mcp + resource_response = await client.get( + "/.well-known/oauth-protected-resource/mcp" + ) + assert resource_response.status_code == 200 + resource_data = resource_response.json() + + # Verify endpoints are consistent and correct + assert ( + auth_data["authorization_endpoint"] == f"{TEST_BASE_URL}/authorize" + ) + assert auth_data["registration_endpoint"] == f"{TEST_BASE_URL}/register" + assert auth_data["issuer"] == TEST_REALM_URL + assert ( + auth_data["jwks_uri"] == f"{TEST_REALM_URL}/.well-known/jwks.json" + ) + + assert resource_data["resource"] == f"{TEST_BASE_URL}/mcp" + assert f"{TEST_BASE_URL}/" in resource_data["authorization_servers"] + + async def test_authorization_flow_with_real_parameters(self): + """Test authorization flow with realistic OAuth parameters.""" + with patch("httpx.get") as mock_get: + mock_response = Mock() + mock_response.json.return_value = { + "issuer": TEST_REALM_URL, + "authorization_endpoint": f"{TEST_REALM_URL}/protocol/openid-connect/auth", + "token_endpoint": f"{TEST_REALM_URL}/protocol/openid-connect/token", + "jwks_uri": f"{TEST_REALM_URL}/.well-known/jwks.json", + } + mock_response.raise_for_status.return_value = None + mock_get.return_value = mock_response + + provider = KeycloakAuthProvider( + realm_url=TEST_REALM_URL, + base_url=TEST_BASE_URL, + required_scopes=TEST_REQUIRED_SCOPES, + ) + + mcp = FastMCP("test-server", auth=provider) + mcp_http_app = mcp.http_app() + + # Realistic OAuth authorization parameters + oauth_params = { + "response_type": "code", + "client_id": "test-client-id", + "redirect_uri": "http://localhost:8000/auth/callback", + "state": "random-state-string-12345", + "code_challenge": "dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk", + "code_challenge_method": "S256", + "nonce": "random-nonce-67890", + } + + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=mcp_http_app), + base_url=TEST_BASE_URL, + follow_redirects=False, + ) as client: + response = await client.get("/authorize", params=oauth_params) + + assert response.status_code == 302 + location = response.headers["location"] + + # Parse redirect URL to verify parameters + parsed = urlparse(location) + query_params = parse_qs(parsed.query) + + # Verify all parameters are preserved + assert query_params["response_type"][0] == "code" + assert query_params["client_id"][0] == "test-client-id" + assert ( + query_params["redirect_uri"][0] + == "http://localhost:8000/auth/callback" + ) + assert query_params["state"][0] == "random-state-string-12345" + assert ( + query_params["code_challenge"][0] + == "dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk" + ) + assert query_params["code_challenge_method"][0] == "S256" + assert query_params["nonce"][0] == "random-nonce-67890" + + # Verify scope injection + injected_scopes = query_params["scope"][0].split(" ") + assert set(injected_scopes) == set(TEST_REQUIRED_SCOPES) + + async def test_error_handling_with_keycloak_unavailable(self): + """Test error handling when Keycloak is unavailable.""" + # Mock network error when trying to discover OIDC configuration + with patch("httpx.get") as mock_get: + mock_get.side_effect = httpx.RequestError("Network error") + + with pytest.raises(Exception): # Should raise some network/discovery error + KeycloakAuthProvider( + realm_url=TEST_REALM_URL, + base_url=TEST_BASE_URL, + ) + + async def test_concurrent_client_registrations(self): + """Test handling multiple concurrent client registrations.""" + with patch("httpx.get") as mock_get: + mock_response = Mock() + mock_response.json.return_value = { + "issuer": TEST_REALM_URL, + "authorization_endpoint": f"{TEST_REALM_URL}/protocol/openid-connect/auth", + "token_endpoint": f"{TEST_REALM_URL}/protocol/openid-connect/token", + "jwks_uri": f"{TEST_REALM_URL}/.well-known/jwks.json", + "registration_endpoint": f"{TEST_REALM_URL}/clients-registrations/openid-connect", + } + mock_response.raise_for_status.return_value = None + mock_get.return_value = mock_response + + provider = KeycloakAuthProvider( + realm_url=TEST_REALM_URL, + base_url=TEST_BASE_URL, + required_scopes=TEST_REQUIRED_SCOPES, + ) + + mcp = FastMCP("test-server", auth=provider) + mcp_http_app = mcp.http_app() + + # Mock concurrent Keycloak responses + with patch( + "fastmcp.server.auth.providers.keycloak.httpx.AsyncClient" + ) as mock_client_class: + mock_client = AsyncMock() + mock_client_class.return_value.__aenter__.return_value = mock_client + + # Different responses for different clients + responses = [ + { + "client_id": f"client-{i}", + "client_secret": f"secret-{i}", + "token_endpoint_auth_method": "client_secret_basic", + "response_types": ["code", "none"], + } + for i in range(3) + ] + + mock_responses = [] + for response in responses: + mock_resp = Mock() + mock_resp.status_code = 201 + mock_resp.json.return_value = response + mock_resp.headers = {"content-type": "application/json"} + mock_responses.append(mock_resp) + + mock_client.post.side_effect = mock_responses + + # Make concurrent requests + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=mcp_http_app), + base_url=TEST_BASE_URL, + ) as client: + registration_data = [ + { + "redirect_uris": [f"http://localhost:800{i}/callback"], + "client_name": f"test-client-{i}", + } + for i in range(3) + ] + + # Send concurrent requests + tasks = [ + client.post("/register", json=data) + for data in registration_data + ] + responses = await asyncio.gather(*tasks) + + # Verify all requests succeeded + for i, response in enumerate(responses): + assert response.status_code == 201 + client_info = response.json() + assert "client_id" in client_info + assert "client_secret" in client_info + + +class TestKeycloakProviderEnvironmentConfiguration: + """Test configuration from environment variables in integration context.""" + + def test_provider_loads_all_settings_from_environment(self): + """Test that provider can be fully configured from environment.""" + env_vars = { + "FASTMCP_SERVER_AUTH_KEYCLOAK_REALM_URL": TEST_REALM_URL, + "FASTMCP_SERVER_AUTH_KEYCLOAK_BASE_URL": TEST_BASE_URL, + "FASTMCP_SERVER_AUTH_KEYCLOAK_REQUIRED_SCOPES": "openid,profile,email,custom:scope", + } + + with ( + patch.dict(os.environ, env_vars), + patch("httpx.get") as mock_get, + ): + mock_response = Mock() + mock_response.json.return_value = { + "issuer": TEST_REALM_URL, + "authorization_endpoint": f"{TEST_REALM_URL}/protocol/openid-connect/auth", + "token_endpoint": f"{TEST_REALM_URL}/protocol/openid-connect/token", + "jwks_uri": f"{TEST_REALM_URL}/.well-known/jwks.json", + } + mock_response.raise_for_status.return_value = None + mock_get.return_value = mock_response + + # Should work with no explicit parameters + provider = KeycloakAuthProvider() + + assert provider.realm_url == TEST_REALM_URL + assert str(provider.base_url) == TEST_BASE_URL + "/" + assert provider.token_verifier.required_scopes == [ + "openid", + "profile", + "email", + "custom:scope", + ] + + async def test_provider_works_in_production_like_environment(self): + """Test provider configuration that mimics production deployment.""" + production_env = { + "FASTMCP_SERVER_AUTH_KEYCLOAK_REALM_URL": "https://auth.company.com/realms/production", + "FASTMCP_SERVER_AUTH_KEYCLOAK_BASE_URL": "https://api.company.com", + "FASTMCP_SERVER_AUTH_KEYCLOAK_REQUIRED_SCOPES": "openid,profile,email,api:read,api:write", + } + + with ( + patch.dict(os.environ, production_env), + patch("httpx.get") as mock_get, + ): + mock_response = Mock() + mock_response.json.return_value = { + "issuer": "https://auth.company.com/realms/production", + "authorization_endpoint": "https://auth.company.com/realms/production/protocol/openid-connect/auth", + "token_endpoint": "https://auth.company.com/realms/production/protocol/openid-connect/token", + "jwks_uri": "https://auth.company.com/realms/production/.well-known/jwks.json", + "registration_endpoint": "https://auth.company.com/realms/production/clients-registrations/openid-connect", + } + mock_response.raise_for_status.return_value = None + mock_get.return_value = mock_response + + provider = KeycloakAuthProvider() + mcp = FastMCP("production-server", auth=provider) + mcp_http_app = mcp.http_app() + + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=mcp_http_app), + base_url="https://api.company.com", + ) as client: + # Test discovery endpoints work + response = await client.get("/.well-known/oauth-authorization-server") + assert response.status_code == 200 + data = response.json() + + assert data["issuer"] == "https://auth.company.com/realms/production" + assert ( + data["authorization_endpoint"] + == "https://api.company.com/authorize" + ) + assert ( + data["registration_endpoint"] == "https://api.company.com/register" + ) diff --git a/tests/server/auth/providers/test_keycloak.py b/tests/server/auth/providers/test_keycloak.py new file mode 100644 index 000000000..49ad8d5dd --- /dev/null +++ b/tests/server/auth/providers/test_keycloak.py @@ -0,0 +1,475 @@ +"""Unit tests for Keycloak OAuth provider - Fixed version.""" + +import os +from unittest.mock import patch +from urllib.parse import parse_qs, urlparse + +import httpx +import pytest + +from fastmcp import FastMCP +from fastmcp.server.auth.oidc_proxy import OIDCConfiguration +from fastmcp.server.auth.providers.jwt import JWTVerifier +from fastmcp.server.auth.providers.keycloak import ( + KeycloakAuthProvider, + KeycloakProviderSettings, +) + +TEST_REALM_URL = "https://keycloak.example.com/realms/test" +TEST_BASE_URL = "https://example.com:8000" +TEST_REQUIRED_SCOPES = ["openid", "profile"] + + +@pytest.fixture +def valid_oidc_configuration_dict(): + """Create a valid OIDC configuration dict for testing.""" + return { + "issuer": TEST_REALM_URL, + "authorization_endpoint": f"{TEST_REALM_URL}/protocol/openid-connect/auth", + "token_endpoint": f"{TEST_REALM_URL}/protocol/openid-connect/token", + "jwks_uri": f"{TEST_REALM_URL}/.well-known/jwks.json", + "registration_endpoint": f"{TEST_REALM_URL}/clients-registrations/openid-connect", + "response_types_supported": ["code", "id_token", "token"], + "subject_types_supported": ["public"], + "id_token_signing_alg_values_supported": ["RS256"], + } + + +@pytest.fixture +def mock_oidc_config(valid_oidc_configuration_dict): + """Create a mock OIDCConfiguration object.""" + return OIDCConfiguration.model_validate(valid_oidc_configuration_dict) + + +def create_minimal_oidc_config(): + """Create a minimal valid OIDC configuration for testing.""" + return OIDCConfiguration.model_validate( + { + "issuer": TEST_REALM_URL, + "authorization_endpoint": f"{TEST_REALM_URL}/protocol/openid-connect/auth", + "token_endpoint": f"{TEST_REALM_URL}/protocol/openid-connect/token", + "jwks_uri": f"{TEST_REALM_URL}/.well-known/jwks.json", + "response_types_supported": ["code"], + "subject_types_supported": ["public"], + "id_token_signing_alg_values_supported": ["RS256"], + } + ) + + +class TestKeycloakProviderSettings: + """Test settings for Keycloak OAuth provider.""" + + def test_settings_from_env_vars(self): + """Test that settings can be loaded from environment variables.""" + with patch.dict( + os.environ, + { + "FASTMCP_SERVER_AUTH_KEYCLOAK_REALM_URL": TEST_REALM_URL, + "FASTMCP_SERVER_AUTH_KEYCLOAK_BASE_URL": TEST_BASE_URL, + "FASTMCP_SERVER_AUTH_KEYCLOAK_REQUIRED_SCOPES": ",".join( + TEST_REQUIRED_SCOPES + ), + }, + ): + # Let environment variables populate the settings + settings = KeycloakProviderSettings.model_validate({}) + + assert str(settings.realm_url) == TEST_REALM_URL + assert str(settings.base_url).rstrip("/") == TEST_BASE_URL + assert settings.required_scopes == TEST_REQUIRED_SCOPES + + def test_settings_explicit_override_env(self): + """Test that explicit settings override environment variables.""" + with patch.dict( + os.environ, + { + "FASTMCP_SERVER_AUTH_KEYCLOAK_REALM_URL": TEST_REALM_URL, + "FASTMCP_SERVER_AUTH_KEYCLOAK_BASE_URL": TEST_BASE_URL, + }, + ): + settings = KeycloakProviderSettings.model_validate( + { + "realm_url": "https://explicit.keycloak.com/realms/explicit", + "base_url": "https://explicit.example.com", + } + ) + + assert ( + str(settings.realm_url) + == "https://explicit.keycloak.com/realms/explicit" + ) + assert str(settings.base_url).rstrip("/") == "https://explicit.example.com" + + @pytest.mark.parametrize( + "scopes_env", + [ + "openid,profile", + '["openid", "profile"]', + ], + ) + def test_settings_parse_scopes(self, scopes_env): + """Test that scopes are parsed correctly from different formats.""" + with patch.dict( + os.environ, + { + "FASTMCP_SERVER_AUTH_KEYCLOAK_REALM_URL": TEST_REALM_URL, + "FASTMCP_SERVER_AUTH_KEYCLOAK_BASE_URL": TEST_BASE_URL, + "FASTMCP_SERVER_AUTH_KEYCLOAK_REQUIRED_SCOPES": scopes_env, + }, + ): + # Let environment variables populate the settings + settings = KeycloakProviderSettings.model_validate({}) + assert settings.required_scopes == ["openid", "profile"] + + +class TestKeycloakAuthProvider: + """Test KeycloakAuthProvider initialization.""" + + def test_init_with_explicit_params(self, mock_oidc_config): + """Test initialization with explicit parameters.""" + with patch.object( + KeycloakAuthProvider, "_discover_oidc_configuration" + ) as mock_discover: + mock_discover.return_value = mock_oidc_config + + provider = KeycloakAuthProvider( + realm_url=TEST_REALM_URL, + base_url=TEST_BASE_URL, + required_scopes=TEST_REQUIRED_SCOPES, + ) + + mock_discover.assert_called_once() + + assert provider.realm_url == TEST_REALM_URL + assert str(provider.base_url) == TEST_BASE_URL + "/" + assert isinstance(provider.token_verifier, JWTVerifier) + assert provider.token_verifier.required_scopes == TEST_REQUIRED_SCOPES + + def test_init_with_env_vars(self, mock_oidc_config): + """Test initialization with environment variables.""" + with ( + patch.dict( + os.environ, + { + "FASTMCP_SERVER_AUTH_KEYCLOAK_REALM_URL": TEST_REALM_URL, + "FASTMCP_SERVER_AUTH_KEYCLOAK_BASE_URL": TEST_BASE_URL, + "FASTMCP_SERVER_AUTH_KEYCLOAK_REQUIRED_SCOPES": ",".join( + TEST_REQUIRED_SCOPES + ), + }, + ), + patch.object( + KeycloakAuthProvider, "_discover_oidc_configuration" + ) as mock_discover, + ): + mock_discover.return_value = mock_oidc_config + + provider = KeycloakAuthProvider() + + mock_discover.assert_called_once() + + assert provider.realm_url == TEST_REALM_URL + assert str(provider.base_url) == TEST_BASE_URL + "/" + assert provider.token_verifier.required_scopes == TEST_REQUIRED_SCOPES + + def test_init_with_custom_token_verifier(self, mock_oidc_config): + """Test initialization with custom token verifier.""" + custom_verifier = JWTVerifier( + jwks_uri=f"{TEST_REALM_URL}/.well-known/jwks.json", + issuer=TEST_REALM_URL, + audience="custom-client-id", + required_scopes=["custom:scope"], + ) + + with patch.object( + KeycloakAuthProvider, "_discover_oidc_configuration" + ) as mock_discover: + mock_discover.return_value = mock_oidc_config + + provider = KeycloakAuthProvider( + realm_url=TEST_REALM_URL, + base_url=TEST_BASE_URL, + token_verifier=custom_verifier, + ) + + assert provider.token_verifier is custom_verifier + assert provider.token_verifier.audience == "custom-client-id" + assert provider.token_verifier.required_scopes == ["custom:scope"] + + +class TestKeycloakOIDCDiscovery: + """Test OIDC configuration discovery.""" + + def test_discover_oidc_configuration_success(self, valid_oidc_configuration_dict): + """Test successful OIDC configuration discovery.""" + with patch( + "fastmcp.server.auth.oidc_proxy.OIDCConfiguration.get_oidc_configuration" + ) as mock_get: + mock_config = OIDCConfiguration.model_validate( + valid_oidc_configuration_dict + ) + mock_get.return_value = mock_config + + KeycloakAuthProvider( + realm_url=TEST_REALM_URL, + base_url=TEST_BASE_URL, + ) + + mock_get.assert_called_once() + call_args = mock_get.call_args + assert ( + str(call_args[0][0]) + == f"{TEST_REALM_URL}/.well-known/openid-configuration" + ) + assert call_args[1]["strict"] is False + + def test_discover_oidc_configuration_with_defaults(self): + """Test OIDC configuration discovery with default values.""" + # Create a minimal config with only required fields but missing optional ones + minimal_config = { + "issuer": TEST_REALM_URL, + "authorization_endpoint": f"{TEST_REALM_URL}/protocol/openid-connect/auth", + "token_endpoint": f"{TEST_REALM_URL}/protocol/openid-connect/token", + "jwks_uri": f"{TEST_REALM_URL}/.well-known/jwks.json", # Required field + "response_types_supported": ["code"], + "subject_types_supported": ["public"], + "id_token_signing_alg_values_supported": ["RS256"], + # Missing registration_endpoint - this should get default + } + + with patch( + "fastmcp.server.auth.oidc_proxy.OIDCConfiguration.get_oidc_configuration" + ) as mock_get: + mock_config = OIDCConfiguration.model_validate(minimal_config) + mock_get.return_value = mock_config + + provider = KeycloakAuthProvider( + realm_url=TEST_REALM_URL, + base_url=TEST_BASE_URL, + ) + + # Check that defaults were applied for missing optional fields + config = provider.oidc_config + assert config.jwks_uri == f"{TEST_REALM_URL}/.well-known/jwks.json" + assert config.issuer == TEST_REALM_URL + assert ( + config.registration_endpoint + == f"{TEST_REALM_URL}/clients-registrations/openid-connect" + ) + + +class TestKeycloakRoutes: + """Test Keycloak auth provider routes.""" + + @pytest.fixture + def keycloak_provider(self, mock_oidc_config): + """Create a KeycloakAuthProvider for testing.""" + with patch.object( + KeycloakAuthProvider, "_discover_oidc_configuration" + ) as mock_discover: + mock_discover.return_value = mock_oidc_config + return KeycloakAuthProvider( + realm_url=TEST_REALM_URL, + base_url=TEST_BASE_URL, + required_scopes=TEST_REQUIRED_SCOPES, + ) + + def test_get_routes_includes_all_endpoints(self, keycloak_provider): + """Test that get_routes returns all required endpoints.""" + routes = keycloak_provider.get_routes() + + # Should have RemoteAuthProvider routes plus Keycloak-specific ones + assert len(routes) >= 4 + + paths = [route.path for route in routes] + assert "/.well-known/oauth-protected-resource" in paths + assert "/.well-known/oauth-authorization-server" in paths + assert "/register" in paths + assert "/authorize" in paths + + async def test_oauth_authorization_server_metadata_endpoint( + self, keycloak_provider + ): + """Test the OAuth authorization server metadata endpoint.""" + mcp = FastMCP("test-server", auth=keycloak_provider) + mcp_http_app = mcp.http_app() + + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=mcp_http_app), + base_url=TEST_BASE_URL, + ) as client: + response = await client.get("/.well-known/oauth-authorization-server") + + assert response.status_code == 200 + data = response.json() + + # Check that the metadata includes FastMCP proxy endpoints + assert data["registration_endpoint"] == f"{TEST_BASE_URL}/register" + assert data["authorization_endpoint"] == f"{TEST_BASE_URL}/authorize" + assert data["issuer"] == TEST_REALM_URL + assert data["jwks_uri"] == f"{TEST_REALM_URL}/.well-known/jwks.json" + + +class TestKeycloakClientRegistrationProxy: + """Test client registration proxy functionality.""" + + @pytest.fixture + def keycloak_provider(self, mock_oidc_config): + """Create a KeycloakAuthProvider for testing.""" + with patch.object( + KeycloakAuthProvider, "_discover_oidc_configuration" + ) as mock_discover: + mock_discover.return_value = mock_oidc_config + return KeycloakAuthProvider( + realm_url=TEST_REALM_URL, + base_url=TEST_BASE_URL, + required_scopes=TEST_REQUIRED_SCOPES, + ) + + async def test_register_client_proxy_endpoint_exists(self, keycloak_provider): + """Test that the client registration proxy endpoint exists.""" + mcp = FastMCP("test-server", auth=keycloak_provider) + mcp_http_app = mcp.http_app() + + # Test that the endpoint exists by making a request + # We'll expect it to fail due to missing mock, but should not be a 404 + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=mcp_http_app), + base_url=TEST_BASE_URL, + ) as client: + response = await client.post( + "/register", + json={ + "redirect_uris": ["http://localhost:8000/callback"], + "client_name": "test-client", + }, + ) + + # Should not be 404 (endpoint exists) but will be 500 due to no mock + assert response.status_code != 404 + + +class TestKeycloakAuthorizationProxy: + """Test authorization proxy functionality.""" + + @pytest.fixture + def keycloak_provider(self, mock_oidc_config): + """Create a KeycloakAuthProvider for testing.""" + with patch.object( + KeycloakAuthProvider, "_discover_oidc_configuration" + ) as mock_discover: + mock_discover.return_value = mock_oidc_config + return KeycloakAuthProvider( + realm_url=TEST_REALM_URL, + base_url=TEST_BASE_URL, + required_scopes=TEST_REQUIRED_SCOPES, + ) + + async def test_authorize_proxy_with_scope_injection(self, keycloak_provider): + """Test authorization proxy with scope injection.""" + mcp = FastMCP("test-server", auth=keycloak_provider) + mcp_http_app = mcp.http_app() + + params = { + "client_id": "test-client", + "redirect_uri": "http://localhost:8000/callback", + "response_type": "code", + "state": "test-state", + } + + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=mcp_http_app), + base_url=TEST_BASE_URL, + follow_redirects=False, + ) as client: + response = await client.get("/authorize", params=params) + + assert response.status_code == 302 + + # Parse the redirect URL + location = response.headers["location"] + parsed_url = urlparse(location) + query_params = parse_qs(parsed_url.query) + + # Check that scope was injected + assert "scope" in query_params + injected_scopes = query_params["scope"][0].split(" ") + assert set(injected_scopes) == set(TEST_REQUIRED_SCOPES) + + # Check other parameters are preserved + assert query_params["client_id"][0] == "test-client" + assert query_params["redirect_uri"][0] == "http://localhost:8000/callback" + assert query_params["response_type"][0] == "code" + assert query_params["state"][0] == "test-state" + + +class TestKeycloakEdgeCases: + """Test edge cases and error conditions for KeycloakAuthProvider.""" + + def test_malformed_oidc_configuration_handling(self): + """Test handling of OIDC configuration with missing optional fields.""" + # Create a config with all required fields but missing some optional ones + config_with_missing_optionals = { + "issuer": TEST_REALM_URL, + "authorization_endpoint": f"{TEST_REALM_URL}/protocol/openid-connect/auth", + "token_endpoint": f"{TEST_REALM_URL}/protocol/openid-connect/token", + "jwks_uri": f"{TEST_REALM_URL}/.well-known/jwks.json", # Required + "response_types_supported": ["code"], + "subject_types_supported": ["public"], + "id_token_signing_alg_values_supported": ["RS256"], + # Missing registration_endpoint (optional) + } + + with patch( + "fastmcp.server.auth.oidc_proxy.OIDCConfiguration.get_oidc_configuration" + ) as mock_get: + # First return the config without optional fields + mock_config = OIDCConfiguration.model_validate( + config_with_missing_optionals + ) + mock_get.return_value = mock_config + + provider = KeycloakAuthProvider( + realm_url=TEST_REALM_URL, + base_url=TEST_BASE_URL, + ) + + # Should apply defaults for missing optional fields + config = provider.oidc_config + assert config.jwks_uri == f"{TEST_REALM_URL}/.well-known/jwks.json" + assert ( + config.registration_endpoint + == f"{TEST_REALM_URL}/clients-registrations/openid-connect" + ) + + def test_empty_required_scopes_handling(self): + """Test handling of empty required scopes.""" + with patch.object( + KeycloakAuthProvider, "_discover_oidc_configuration" + ) as mock_discover: + mock_discover.return_value = create_minimal_oidc_config() + + provider = KeycloakAuthProvider( + realm_url=TEST_REALM_URL, + base_url=TEST_BASE_URL, + required_scopes=[], + ) + + assert provider.token_verifier.required_scopes == [] + + def test_realm_url_with_trailing_slash(self): + """Test handling of realm URL with trailing slash.""" + realm_url_with_slash = TEST_REALM_URL + "/" + + with patch.object( + KeycloakAuthProvider, "_discover_oidc_configuration" + ) as mock_discover: + mock_discover.return_value = create_minimal_oidc_config() + + provider = KeycloakAuthProvider( + realm_url=realm_url_with_slash, + base_url=TEST_BASE_URL, + ) + + # Should normalize by removing trailing slash + assert provider.realm_url == TEST_REALM_URL