-
Notifications
You must be signed in to change notification settings - Fork 72
Adding encryption example using a KMS and JWT-based auth #138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
30bf2d6
92fd61b
c15d3f3
63dca35
ffdffd6
0ee9a2f
5afc265
aacfa07
5c4551b
cc6893f
a47c21e
cf70c45
ed60aad
72cd501
898eebc
208bd1f
03d5a18
c29dc87
af6e621
b4583f5
55a1d20
e44d1c9
6d2f6a5
374ac88
9c8dd79
fc6e94b
ccc983c
e3f76d5
1c3060f
be9aec4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
_certs |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
# Encryption with Temporal user role access | ||
|
||
This sample demonstrates: | ||
|
||
- CORS settings to allow connections to a codec server | ||
- using a KMS key to encrypt/decrypt payloads | ||
- extracting data from a JWT | ||
- controlling decyption based on a user's Temporal Cloud role | ||
|
||
The Codec Server uses the [Operations API](https://docs.temporal.io/ops) to get user information. It would be helpful to be familiar with the API's requirements. This API is currently a beta relase and may change in the future. | ||
|
||
## Install | ||
|
||
For this sample, the optional `encryption_jwt` and `bedrock` dependency groups must be included. To include, run: | ||
|
||
```sh | ||
poetry install --with encryption_jwt,bedrock | ||
``` | ||
|
||
## Setup | ||
|
||
> [!WARNING] | ||
> You must connect your Worker(s) to Temporal Cloud to see decryption working in the Web UI. | ||
|
||
### Key management | ||
|
||
This example uses the [AWS Key Management Service](https://aws.amazon.com/kms/) (KMS). You will need | ||
to create a "Customer managed key" with its Alias set to your Temporal Namespace (replace `.`s with `_`s). | ||
Alternately replace the key management portion with your own implementation. | ||
|
||
### Self-signed certificates | ||
|
||
The codec server will need to use HTTPS, self-signed certificates will work in the development | ||
environment. Run the following command in a `_certs` directory that's a subdirectory of this one. | ||
It will create certificate files that are good for 10 years. | ||
|
||
```sh | ||
openssl req -x509 -newkey rsa:4096 -sha256 -days 3650 -nodes -keyout localhost.key -out localhost.pem -subj "/CN=localhost" | ||
``` | ||
|
||
In the projects you can access the files using the following relative paths. | ||
|
||
- `./_certs/localhost.pem` | ||
- `./_certs/localhost.key` | ||
|
||
## Run | ||
|
||
### Worker | ||
|
||
To run, first see the [repo README.md](../README.md) for prerequisites. | ||
|
||
Before starting the worker, open a terminal and add the following environment variables with | ||
appropriate values: | ||
|
||
```sh | ||
export TEMPORAL_ADDRESS=<your temporal domain and port> | ||
export TEMPORAL_TLS_CERT=<path to the crt file used to generate the CA Certificate for the temporal namespace> | ||
export TEMPORAL_TLS_KEY=<path to the key file used to generate the CA Certificate for the temporal namespace> | ||
export AWS_ACCESS_KEY_ID=<AWS account access key> | ||
export AWS_SECRET_ACCESS_KEY=<AWS account secret key> | ||
export AWS_SESSION_TOKEN=<AWS session token> | ||
``` | ||
|
||
In the same terminal start the worker: | ||
phillipskevin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
```sh | ||
poetry run python worker.py <namespace> | ||
``` | ||
|
||
> [!Note] | ||
> You will need to run at least one Worker per-namespace. | ||
|
||
### Codec server | ||
|
||
The codec server allows you to see the encrypted payloads of workflows in the Web UI. The server | ||
must be started with secure connections (HTTPS), you will need the paths to a pem (crt) and key | ||
file. [Self-signed certificates](#self-signed-certificates) will work just fine. | ||
|
||
You will also need a [Temporal API Key](https://docs.temporal.io/cloud/api-keys#generate-an-api-key). It's value is set using the `TEMPORAL_API_KEY` env var. | ||
|
||
Open a new terminal and add the following environment variables with values: | ||
|
||
```sh | ||
export TEMPORAL_TLS_CERT=<path to the crt file used to generate the CA Certificate for the namespace> | ||
export TEMPORAL_TLS_KEY=<path to the key file used to generate the CA Certificate for the namespace> | ||
export TEMPORAL_API_KEY=<An API key> # see https://docs.temporal.io/cloud/tcld/apikey#create | ||
export TEMPORAL_OPS_ADDRESS=saas-api.tmprl.cloud:443 # uses "saas-api.tmprl.cloud:443" if not provided | ||
export TEMPORAL_OPS_API_VERSION=2024-05-13-00 | ||
export AWS_ACCESS_KEY_ID=<AWS account access key> | ||
export AWS_SECRET_ACCESS_KEY=<AWS account secret key> | ||
export AWS_SESSION_TOKEN=<AWS session token> | ||
export SSL_PEM=<path to self-signed pem (crt) file> | ||
export SSL_KEY=<path to self-signed key file> | ||
``` | ||
|
||
In the same terminal start the codec server: | ||
|
||
```sh | ||
poetry run python codec_server.py | ||
``` | ||
|
||
### Execute workflow | ||
|
||
In a third terminal, add the environment variables: | ||
|
||
```txt | ||
export TEMPORAL_ADDRESS=<your temporal domain and port> | ||
export TEMPORAL_TLS_CERT=<path to the crt file used to generate the CA Certificate for the namespace> | ||
export TEMPORAL_TLS_KEY=<path to the key file used to generate the CA Certificate for the namespace> | ||
``` | ||
|
||
Then run the command to execute the workflow: | ||
|
||
```sh | ||
poetry run python starter.py <namespace> | ||
``` | ||
|
||
The workflow should complete with the hello result. To view the workflow, use [temporal](https://docs.temporal.io/cli): | ||
|
||
```sh | ||
temporal workflow show --workflow-id encryption-workflow-id | ||
``` | ||
|
||
Note how the result looks (with wrapping removed): | ||
|
||
```txt | ||
Output:[encoding binary/encrypted: payload encoding is not supported] | ||
``` | ||
|
||
This is because the data is encrypted and not visible. | ||
|
||
## Temporal Web UI | ||
|
||
Open the Web UI and select a workflow, you'll only see encrypted results. To see decrypted results: | ||
|
||
- You must have the Temporal role of "admin" | ||
- The codec server must be running | ||
- Set the "Remote Codec Endpoint" in the web UI to the codec server domain: `https://localhost:8081` | ||
- Both the "Pass the user access token" and "Include cross-origin credentials" must be enabled | ||
|
||
Once those requirements are met you can then see the unencrypted results. This is possible because | ||
CORS settings in the codec server allow the browser to access the codec server directly over | ||
localhost. Decrypted data never leaves your local machine. See [Codec | ||
Server](https://docs.temporal.io/production-deployment/data-encryption) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
from typing import Iterable, List | ||
|
||
from temporalio.api.common.v1 import Payload | ||
from temporalio.converter import PayloadCodec | ||
|
||
from encryption_jwt.encryptor import KMSEncryptor | ||
|
||
|
||
class EncryptionCodec(PayloadCodec): | ||
def __init__(self, namespace: str): | ||
self._encryptor = KMSEncryptor(namespace) | ||
|
||
async def encode(self, payloads: Iterable[Payload]) -> List[Payload]: | ||
# We blindly encode all payloads with the key and set the metadata with the key that was | ||
# used (base64 encoded). | ||
|
||
async def encrypt_payload(p: Payload): | ||
data, key = await self._encryptor.encrypt(p.SerializeToString()) | ||
return Payload( | ||
metadata={ | ||
"encoding": b"binary/encrypted", | ||
"data_key_encrypted": key, | ||
}, | ||
data=data, | ||
) | ||
|
||
# return list(map(encrypt_payload, payloads)) | ||
return [await encrypt_payload(payload) for payload in payloads] | ||
|
||
async def decode(self, payloads: Iterable[Payload]) -> List[Payload]: | ||
async def decrypt_payload(p: Payload): | ||
data_key_encrypted_base64 = p.metadata.get("data_key_encrypted", b"") | ||
data = await self._encryptor.decrypt(data_key_encrypted_base64, p.data) | ||
return Payload.FromString(data) | ||
|
||
# return list(map(decrypt_payload, payloads)) | ||
return [await decrypt_payload(payload) for payload in payloads] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
import logging | ||
import os | ||
import ssl | ||
|
||
import jwt | ||
import requests | ||
from aiohttp import hdrs, web | ||
from google.protobuf import json_format | ||
from jwt import PyJWK | ||
from jwt.algorithms import RSAAlgorithm | ||
from temporalio.api.cloud.cloudservice.v1 import GetUsersRequest | ||
from temporalio.api.common.v1 import Payloads | ||
from temporalio.client import CloudOperationsClient | ||
|
||
from encryption_jwt.codec import EncryptionCodec | ||
|
||
AUTHORIZED_ACCOUNT_ACCESS_ROLES = ["owner", "admin"] | ||
AUTHORIZED_NAMESPACE_ACCESS_ROLES = ["read", "write", "admin"] | ||
|
||
TEMPORAL_CLIENT_CLOUD_API_VERSION = "2024-05-13-00" | ||
|
||
temporal_ops_address = ( | ||
os.environ.get("TEMPORAL_OPS_ADDRESS") or "saas-api.tmprl.cloud:443" | ||
) | ||
|
||
|
||
def build_codec_server() -> web.Application: | ||
# Cors handler | ||
async def cors_options(req: web.Request) -> web.Response: | ||
resp = web.Response() | ||
|
||
if req.headers.get(hdrs.ORIGIN) == "http://localhost:8080": | ||
logger.info("Setting CORS headers for localhost") | ||
resp.headers[hdrs.ACCESS_CONTROL_ALLOW_ORIGIN] = "http://localhost:8080" | ||
|
||
elif req.headers.get(hdrs.ORIGIN) == "https://cloud.temporal.io": | ||
logger.info("Setting CORS headers for cloud.temporal.io") | ||
resp.headers[hdrs.ACCESS_CONTROL_ALLOW_ORIGIN] = "https://cloud.temporal.io" | ||
|
||
allow_headers = "content-type,x-namespace" | ||
if req.scheme.lower() == "https": | ||
allow_headers += ",authorization" | ||
resp.headers[hdrs.ACCESS_CONTROL_ALLOW_CREDENTIALS] = "true" | ||
|
||
# common | ||
resp.headers[hdrs.ACCESS_CONTROL_ALLOW_METHODS] = "POST" | ||
resp.headers[hdrs.ACCESS_CONTROL_ALLOW_HEADERS] = allow_headers | ||
|
||
return resp | ||
|
||
async def decryption_authorized(email: str, namespace: str) -> bool: | ||
client = await CloudOperationsClient.connect( | ||
api_key=os.environ.get("TEMPORAL_API_KEY"), | ||
version=TEMPORAL_CLIENT_CLOUD_API_VERSION, | ||
) | ||
|
||
response = await client.cloud_service.get_users( | ||
GetUsersRequest(namespace=namespace) | ||
) | ||
|
||
for user in response.users: | ||
if user.spec.email.lower() == email.lower(): | ||
if ( | ||
user.spec.access.account_access.role | ||
in AUTHORIZED_ACCOUNT_ACCESS_ROLES | ||
): | ||
return True | ||
else: | ||
if namespace in user.spec.access.namespace_accesses: | ||
if ( | ||
user.spec.access.namespace_accesses[namespace].permission | ||
in AUTHORIZED_NAMESPACE_ACCESS_ROLES | ||
): | ||
return True | ||
|
||
return False | ||
|
||
def make_handler(fn: str): | ||
async def handler(req: web.Request): | ||
namespace = req.headers.get("x-namespace") or "default" | ||
auth_header = req.headers.get("Authorization") or "" | ||
_bearer, encoded = auth_header.split(" ") | ||
|
||
# Extract the kid from the Auth header | ||
jwt_dict = jwt.get_unverified_header(encoded) | ||
kid = jwt_dict["kid"] | ||
algorithm = jwt_dict["alg"] | ||
|
||
# Fetch Temporal Cloud JWKS | ||
jwks_url = "https://login.tmprl.cloud/.well-known/jwks.json" | ||
jwks = requests.get(jwks_url).json() | ||
|
||
# Extract Temporal Cloud's public key | ||
pyjwk = None | ||
for key in jwks["keys"]: | ||
if key["kid"] == kid: | ||
# Convert JWKS key to PEM format | ||
pyjwk = PyJWK.from_dict(key) | ||
break | ||
|
||
if pyjwk is None: | ||
raise ValueError("Public key not found in JWKS") | ||
|
||
# Decode the jwt, verifying against Temporal Cloud's public key | ||
decoded = jwt.decode( | ||
encoded, | ||
pyjwk.key, | ||
algorithms=[algorithm], | ||
audience=[ | ||
"https://saas-api.tmprl.cloud", | ||
"https://prod-tmprl.us.auth0.com/userinfo", | ||
], | ||
) | ||
|
||
# Use the email to determine if the user is authorized to decrypt the payload | ||
authorized = await decryption_authorized( | ||
decoded["https://saas-api.tmprl.cloud/user/email"], namespace | ||
) | ||
|
||
if authorized: | ||
# Read payloads as JSON | ||
assert req.content_type == "application/json" | ||
payloads = json_format.Parse(await req.read(), Payloads()) | ||
encryptionCodec = EncryptionCodec(namespace) | ||
payloads = Payloads( | ||
payloads=await getattr(encryptionCodec, fn)(payloads.payloads) | ||
) | ||
|
||
# Apply CORS and return JSON | ||
resp = await cors_options(req) | ||
resp.content_type = "application/json" | ||
resp.text = json_format.MessageToJson(payloads) | ||
return resp | ||
|
||
return handler | ||
|
||
# Build app | ||
app = web.Application() | ||
# set up logger | ||
logging.basicConfig(level=logging.DEBUG) | ||
logger = logging.getLogger(__name__) | ||
app.add_routes( | ||
[ | ||
web.post("/encode", make_handler("encode")), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to expose the encode over http? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a good idea to. Our web UI and CLI use the encode side when you are starting workflows, sending queries, etc from those interfaces. |
||
web.post("/decode", make_handler("decode")), | ||
web.options("/decode", cors_options), | ||
] | ||
) | ||
|
||
return app | ||
|
||
|
||
if __name__ == "__main__": | ||
# pylint: disable=C0103 | ||
ssl_context = None | ||
if os.environ.get("SSL_PEM") and os.environ.get("SSL_KEY"): | ||
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) | ||
ssl_context.check_hostname = False | ||
ssl_context.load_cert_chain( | ||
os.environ.get("SSL_PEM") or "", os.environ.get("SSL_KEY") or "" | ||
) | ||
|
||
web.run_app( | ||
build_codec_server(), host="0.0.0.0", port=8081, ssl_context=ssl_context | ||
) |
Uh oh!
There was an error while loading. Please reload this page.