-
Notifications
You must be signed in to change notification settings - Fork 1
Add foundational components for EAR generation (JWT creation and signing) and CI setup #12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
abdfa1e
b6224c0
f41a8a5
d6e58f2
ff5410d
4234f62
fb1846c
1c86eb9
31de3a1
d7d2aac
6da16f4
2187370
3c3d806
b6ed9ec
bc09227
3e6eb74
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| [flake8] | ||
| max-line-length = 88 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| name: Run Tox on PR | ||
|
|
||
| on: | ||
| pull_request: | ||
| branches: | ||
| - main | ||
| - '**' # Run on all branches for PRs | ||
|
|
||
| jobs: | ||
| tox-tests: | ||
| runs-on: ubuntu-latest | ||
|
|
||
| strategy: | ||
| matrix: | ||
| python-version: [3.9, 3.11] # Test against multiple Python versions | ||
|
|
||
| steps: | ||
| # Checkout the code | ||
| - name: Checkout code | ||
| uses: actions/checkout@v3 | ||
|
|
||
| # Setup Python | ||
| - name: Set up Python ${{ matrix.python-version }} | ||
| uses: actions/setup-python@v4 | ||
| with: | ||
| python-version: ${{ matrix.python-version }} | ||
|
|
||
| # Install tox | ||
| - name: Install tox | ||
| run: pip install tox | ||
|
|
||
| # Run tox | ||
| - name: Run tox | ||
| run: tox |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,8 +1,9 @@ | ||
| [MESSAGES CONTROL] | ||
| disable = C0114, C0115, C0116 ; Disable missing module/class/function docstring warnings | ||
| disable = C0114, C0115, C0116, redefined-outer-name, duplicate-code | ||
|
|
||
| [FORMAT] | ||
| max-line-length = 88 ; Match Black's default line length | ||
| max-attributes=10 | ||
|
|
||
| [MASTER] | ||
| ignore = venv ; Ignore virtual environment folder |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,26 +1,68 @@ | ||
| # python-ear | ||
| # **python-ear** | ||
|
|
||
| A python implementation of [draft-fv-rats-ear](https://datatracker.ietf.org/doc/draft-fv-rats-ear/). | ||
| A Python library that implements the EAT Attestation Result (EAR) data format, as specified in [draft-fv-rats-ear](https://datatracker.ietf.org/doc/draft-fv-rats-ear/). This library provides implementations for both CBOR-based and JSON-based serialisations. | ||
|
|
||
| # Proposal | ||
| --- | ||
|
|
||
| Following are the tools that will be used in the development of this library | ||
| ## **Overview** | ||
|
|
||
| ## CWT and JWT creation | ||
| The goal of this project is to standardize attestation results by defining a shared information and data model, enabling seamless integration with other components of the RATS architecture. This focuses specifically on harmonizing attestation results to facilitate interoperability between various verifiers and relying parties. | ||
|
|
||
| 1. [python-cwt](https://python-cwt.readthedocs.io/en/stable/) | ||
| 2. [python-jwt](https://pypi.org/project/python-jose/) | ||
| This implementation was initiated as part of the **Veraison Mentorship** under the Linux Foundation Mentorship Program (**LFX Mentorship**), focusing on the following capabilities: | ||
|
|
||
| ## Code formatting and styling | ||
| - **Populating EAR Claims-Sets:** Define and populate claims that represent evidence and attestation results. | ||
| - **Signing EAR Claims-Sets:** Support signing using private keys, ensuring data integrity and authenticity. | ||
| - **Encoding and Decoding:** | ||
| - Encode signed EAR claims as **CWT** (Concise Binary Object Representation Web Tokens) or **JWT** (JSON Web Tokens). | ||
| - Decode signed EARs from CWT or JWT formats, enabling interoperability between different systems. | ||
| - **Signature Verification:** Verify signatures using public keys to ensure the authenticity of claims. | ||
| - **Accessing Claims:** Provide interfaces to access and manage EAR claims efficiently. | ||
|
|
||
| 1. [black](https://pypi.org/project/black/) | ||
| 2. [isort](https://pypi.org/project/isort/) | ||
| This library is developed in Python and makes use of existing packages for CWT and JWT management, static code analysis, and testing. | ||
|
|
||
| ## Linting and static analysis | ||
| --- | ||
|
|
||
| 1. [flake8](https://pypi.org/project/flake8/) | ||
| 2. [mypy](https://pypi.org/project/mypy/) | ||
| ## **Key Features** | ||
|
|
||
| ## Testing | ||
| 1. **Standards Compliance:** | ||
| Implements draft-fv-rats-ear as per IETF specifications to ensure compatibility with the RATS architecture. | ||
|
|
||
| 1. [pytest](https://pypi.org/project/pytest/) | ||
| 2. **Token Management:** | ||
| - **CWT Support:** Utilizes [python-cwt](https://python-cwt.readthedocs.io/en/stable/) for handling CBOR Web Tokens. | ||
| - **JWT Support:** Uses [python-jose](https://pypi.org/project/python-jose/) for JSON Web Tokens management. | ||
|
|
||
| 3. **Security:** | ||
| - Supports signing of EAR claims with private keys and verification with public keys. | ||
| - Adopts secure cryptographic practices for token creation and verification. | ||
|
|
||
| 4. **Static Analysis and Code Quality:** | ||
| - Ensures code quality using linters and static analysis tools. | ||
| - Maintains type safety and code consistency. | ||
|
|
||
| 5. **Testing:** | ||
| - Comprehensive unit tests using `pytest` to validate all functionalities. | ||
|
|
||
| --- | ||
|
|
||
| ## **Technical Stack** | ||
|
|
||
| ### **Token Creation and Management** | ||
|
|
||
| - **CWT:** [python-cwt](https://python-cwt.readthedocs.io/en/stable/) | ||
| - **JWT:** [python-jose](https://pypi.org/project/python-jose/) | ||
|
|
||
| ### **Code Formatting and Styling** | ||
|
|
||
| - **black:** Ensures consistent code formatting. | ||
| - **isort:** Manages import statements. | ||
|
|
||
| ### **Linting and Static Analysis** | ||
|
|
||
| - **flake8:** For PEP 8 compliance and linting. | ||
| - **mypy:** Static type checking. | ||
| - **pyright:** Advanced type checking for Python. | ||
| - **pylint:** Code analysis for error detection and enforcing coding standards. | ||
|
|
||
| ### **Testing** | ||
|
|
||
| - **pytest:** Framework for writing and executing tests. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,99 @@ | ||
| import json | ||
| from abc import ABC | ||
| from collections import namedtuple | ||
| from typing import Any, ClassVar, Dict, Tuple, Type, TypeVar, Union, get_args | ||
|
|
||
| T = TypeVar("T", bound="BaseJCSerializable") | ||
|
|
||
| KeyMapping = namedtuple("KeyMapping", ["int_key", "str_key"]) | ||
|
|
||
|
|
||
| def to_data(value: Any, keys_as_int=False) -> Any: | ||
| if hasattr(value, "to_data"): | ||
| return value.to_data(keys_as_int) | ||
| if hasattr(value, "items"): # dict-like | ||
| return { | ||
| to_data(k, keys_as_int): to_data(v, keys_as_int) for k, v in value.items() | ||
| } | ||
| if hasattr(value, "__iter__") and not isinstance(value, str): # list-like | ||
| return [to_data(v, keys_as_int) for v in value] | ||
|
|
||
| if hasattr( | ||
| value, "value" | ||
| ): # custom classes that have value attr but don't have 'to_data' | ||
| return value.value # type: ignore[attr-defined] | ||
| # scalar and no to_data(), so assume serializable as-is | ||
| return value | ||
|
|
||
|
|
||
| class BaseJCSerializable(ABC): | ||
| jc_map: ClassVar[Dict[str, Tuple[int, str]]] | ||
|
|
||
| def to_data(self, keys_as_int=False) -> Dict[Union[str, int], Any]: | ||
| return { | ||
| (int_key if keys_as_int else str_key): to_data( | ||
| getattr(self, attr), keys_as_int | ||
| ) | ||
| for attr, (int_key, str_key) in self.jc_map.items() | ||
| } | ||
|
|
||
| @classmethod | ||
| def from_data(cls: Type[T], data: dict, keys_as_int=False) -> T: | ||
| key_attr = "int_key" if keys_as_int else "str_key" | ||
| init_kwargs = {} | ||
| reverse_map = { | ||
| getattr(mapping, key_attr): attr for attr, mapping in cls.jc_map.items() | ||
| } | ||
|
|
||
| for key, value in data.items(): | ||
| if key not in reverse_map: | ||
| continue | ||
|
|
||
| attr = reverse_map[key] | ||
| field_type = getattr(cls, "__annotations__", {}).get(attr) | ||
| if field_type is None: | ||
| continue | ||
|
|
||
| args = get_args(field_type) | ||
|
|
||
| if hasattr(field_type, "from_data"): | ||
| # Direct object | ||
| init_kwargs[attr] = field_type.from_data(value, keys_as_int=keys_as_int) | ||
|
|
||
| elif hasattr(field_type, "items") and hasattr(args[1], "from_data"): | ||
| # Dict[str | int, CustomClass] | ||
| init_kwargs[attr] = { | ||
| k: args[1].from_data(v, keys_as_int=keys_as_int) | ||
| for k, v in value.items() | ||
| } | ||
|
|
||
| elif args: | ||
| # custom classes that dont have 'from_data' | ||
| init_kwargs[attr] = args[0](value) | ||
|
|
||
| else: | ||
| init_kwargs[attr] = field_type(value) | ||
|
|
||
| return cls(**init_kwargs) | ||
|
|
||
| def to_dict(self) -> Dict[str, Any]: | ||
| # default str_keys | ||
| return self.to_data() # type: ignore[return-value] # pyright: ignore[reportGeneralTypeIssues] # noqa: E501 # pylint: disable=line-too-long | ||
|
|
||
| def to_int_keys(self) -> Dict[Union[str, int], Any]: | ||
| return self.to_data(keys_as_int=True) | ||
|
|
||
| @classmethod | ||
| def from_dict(cls: Type[T], data: Dict[str, Any]) -> T: | ||
| return cls.from_data(data) | ||
|
|
||
| @classmethod | ||
| def from_int_keys(cls: Type[T], data: Dict[int, Any]) -> T: | ||
| return cls.from_data(data, keys_as_int=True) | ||
|
|
||
| @classmethod | ||
| def from_json(cls, json_str: str): | ||
| return cls.from_dict(json.loads(json_str)) | ||
|
|
||
| def to_json(self): | ||
| return json.dumps(self.to_data()) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,35 +1,76 @@ | ||
| import json | ||
| from dataclasses import dataclass, field | ||
| from typing import Any, Dict | ||
| from datetime import datetime, timedelta | ||
| from typing import Dict | ||
|
|
||
| from jose import jwt # type: ignore # pylint: disable=import-error | ||
|
|
||
| from src.base import BaseJCSerializable, KeyMapping | ||
| from src.errors import EARValidationError | ||
| from src.jwt_config import DEFAULT_ALGORITHM, DEFAULT_EXPIRATION_MINUTES | ||
| from src.submod import Submod | ||
| from src.verifier_id import VerifierID | ||
|
|
||
|
|
||
| # https://datatracker.ietf.org/doc/draft-fv-rats-ear/ | ||
| @dataclass | ||
| class EARClaims: | ||
| class AttestationResult(BaseJCSerializable): | ||
| profile: str | ||
| issued_at: int | ||
| verifier_id: Dict[str, str] = field(default_factory=dict) | ||
| submods: Dict[str, Any] = field(default_factory=dict) | ||
| verifier_id: VerifierID | ||
| submods: Dict[str, Submod] = field(default_factory=dict) | ||
|
|
||
| def to_dict(self) -> Dict[str, Any]: | ||
| return { | ||
| "eat_profile": self.profile, | ||
| "iat": self.issued_at, | ||
| "ear.verifier-id": self.verifier_id, | ||
| "submods": self.submods, | ||
| } | ||
| # https://www.ietf.org/archive/id/draft-ietf-rats-eat-31.html#section-7.2.4 | ||
| jc_map = { | ||
| "profile": KeyMapping(265, "profile"), | ||
| "issued_at": KeyMapping(6, "issued_at"), | ||
| "verifier_id": KeyMapping(1004, "verifier_id"), | ||
|
||
| "submods": KeyMapping(266, "submods"), | ||
| } | ||
|
|
||
| @classmethod | ||
| def from_dict(cls, data: Dict[str, Any]): | ||
| return cls( | ||
| profile=data.get("eat_profile", ""), | ||
| issued_at=data.get("iat", 0), | ||
| verifier_id=data.get("ear.verifier-id", {}), | ||
| submods=data.get("submods", {}), | ||
| ) | ||
| def validate(self): | ||
| # Validates an AttestationResult object | ||
| if not isinstance(self.profile, str) or not self.profile: | ||
| raise EARValidationError( | ||
| "AttestationResult profile must be a non-empty string" | ||
| ) | ||
| if not isinstance(self.issued_at, int) or self.issued_at <= 0: | ||
| raise EARValidationError( | ||
| "AttestationResult issued_at must be a positive integer" | ||
| ) | ||
|
|
||
| def to_json(self) -> str: | ||
| return json.dumps(self.to_dict()) | ||
| self.verifier_id.validate() | ||
|
|
||
| for submod, details in self.submods.items(): | ||
| if not isinstance(details, Submod): | ||
| raise EARValidationError( | ||
| f"Submodule {submod} must contain a valid trust_vector and status" | ||
| ) | ||
|
|
||
| trust_vector = details.trust_vector | ||
| trust_vector.validate() | ||
|
|
||
| def encode_jwt( | ||
| self, | ||
| secret_key: str, | ||
| algorithm: str = DEFAULT_ALGORITHM, | ||
| expiration_minutes: int = DEFAULT_EXPIRATION_MINUTES, | ||
| ) -> str: | ||
| # Signs an AttestationResult object and returns a JWT | ||
| payload = self.to_dict() | ||
| payload["exp"] = int( | ||
| datetime.timestamp(datetime.now() + timedelta(minutes=expiration_minutes)) | ||
| ) | ||
| return jwt.encode( | ||
| payload, secret_key, algorithm=algorithm | ||
| ) # pyright: ignore[reportGeneralTypeIssues] | ||
|
|
||
| @classmethod | ||
| def from_json(cls, json_str: str): | ||
| return cls.from_dict(json.loads(json_str)) | ||
| def decode_jwt( | ||
| cls, token: str, secret_key: str, algorithm: str = DEFAULT_ALGORITHM | ||
| ): | ||
| # Verifies a JWT and returns the decoded AttestationResult object. | ||
| try: | ||
| payload = jwt.decode(token, secret_key, algorithms=[algorithm]) | ||
| return cls.from_dict(payload) | ||
| except Exception as exc: | ||
| raise ValueError(f"JWT decoding failed: {exc}") from exc | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| class EARValidationError(Exception): | ||
| # Custom exception for validation errors in AttestationResult | ||
| pass |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| from datetime import datetime | ||
|
|
||
| from src.claims import AttestationResult | ||
| from src.jwt_config import generate_secret_key | ||
| from src.submod import Submod | ||
| from src.trust_claims import TRUSTWORTHY_INSTANCE_CLAIM, UNRECOGNIZED_INSTANCE_CLAIM | ||
| from src.trust_tier import TRUST_TIER_AFFIRMING, TRUST_TIER_CONTRAINDICATED | ||
| from src.trust_vector import TrustVector | ||
| from src.verifier_id import VerifierID | ||
|
|
||
| # import json | ||
|
|
||
| # Generate a secret key for signing | ||
| secret_key = generate_secret_key() | ||
|
|
||
| # Create an AttestationResult object | ||
| attestation_result = AttestationResult( | ||
| profile="test_profile", | ||
| issued_at=int(datetime.timestamp(datetime.now())), | ||
| verifier_id=VerifierID(developer="Acme Inc.", build="v1"), | ||
| submods={ | ||
| "submod1": Submod( | ||
| trust_vector=TrustVector(instance_identity=UNRECOGNIZED_INSTANCE_CLAIM), | ||
| status=TRUST_TIER_AFFIRMING, | ||
| ), | ||
| "submod2": Submod( | ||
| trust_vector=TrustVector(instance_identity=TRUSTWORTHY_INSTANCE_CLAIM), | ||
| status=TRUST_TIER_CONTRAINDICATED, | ||
| ), | ||
| }, | ||
| ) | ||
|
|
||
| # payload = attestation_result.encode_jwt(secret_key=secret_key) | ||
| # print(payload) | ||
|
|
||
| # decoded = AttestationResult.decode_jwt(token=payload, secret_key=secret_key) | ||
| # output_data = decoded.to_dict() | ||
|
|
||
| # with open("jwt_output.json", "w", encoding="utf-8") as f: | ||
| # json.dump(output_data, f, indent=4) | ||
|
|
||
| # print("Output successfully written to output.json") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The string name should be
"eat_profile". This is a standard [EAT claim])https://www.rfc-editor.org/rfc/rfc9711.html#name-eat_profile-eat-profile-claim). The same goes for other claims as well please check you're using the correct name.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done