From 243d22500adf797259208676977116d00e879025 Mon Sep 17 00:00:00 2001 From: Ryan McLean Date: Fri, 16 Jan 2026 12:52:12 +0000 Subject: [PATCH 1/2] Added Initial Docs --- README.md | 14 + docs/api-reference.md | 228 +++++++++++++ docs/examples.md | 108 ++++++ docs/integration-guide.md | 134 ++++++++ src/scp_sdk/core/export.py | 24 +- src/scp_sdk/core/graph.py | 43 ++- src/scp_sdk/core/models.py | 470 +++++++++++++++++++++------ src/scp_sdk/integrations/config.py | 55 +++- src/scp_sdk/integrations/registry.py | 35 +- src/scp_sdk/integrations/utils.py | 58 +++- src/scp_sdk/testing/cli.py | 13 +- src/scp_sdk/testing/fixtures.py | 9 +- 12 files changed, 1040 insertions(+), 151 deletions(-) create mode 100644 docs/api-reference.md create mode 100644 docs/examples.md create mode 100644 docs/integration-guide.md diff --git a/README.md b/README.md index 2b9c31f..595a5dc 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,20 @@ Python SDK for the **System Capability Protocol** (SCP) - programmatic access to - **Integration Framework**: Reusable base classes and utilities for building integrations (60-80% code reduction) - **Fully Tested**: Comprehensive test suite with pytest +## Documentation + +- **[API Reference](docs/api-reference.md)**: Detailed API documentation for all modules. +- **[Integration Guide](docs/integration-guide.md)**: Step-by-step tutorial for building custom integrations. +- **[Examples](docs/examples.md)**: Common usage patterns and code snippets. + +## Architecture + +The SDK is composed of three main layers: + +1. **Core (`scp_sdk.core`)**: Defines the data models (`SCPManifest`, `System`, `Dependency`) and the `Graph` abstraction for analyzing architecture. +2. **Integrations (`scp_sdk.integrations`)**: Provides a framework (`IntegrationBase`, `IntegrationConfig`) for syncing SCP data to external tools (PagerDuty, ServiceNow, etc.). +3. **Utilities (`scp_sdk.utils`)**: Helper functions for tier handling and common tasks. + ## Installation ```bash diff --git a/docs/api-reference.md b/docs/api-reference.md new file mode 100644 index 0000000..1551f71 --- /dev/null +++ b/docs/api-reference.md @@ -0,0 +1,228 @@ +# SCP SDK API Reference + +This document provides a comprehensive reference for the SCP SDK's public API. + +## Core Models + +The core data models are defined in `scp_sdk.core.models`. These Pydantic models represent the structure of SCP manifests. + +### SCPManifest + +Root model representing a complete `scp.yaml` file. + +```python +from scp_sdk.core.models import SCPManifest + +# Properties +urn: str # System URN, e.g., "urn:scp:payment-service" +name: str # System name +``` + +### System + +Core system identification. + +```python +from scp_sdk.core.models import System + +system = System( + urn="urn:scp:payment-service", + name="Payment Service", + description="Handles payment processing", + version="1.0.0" +) +``` + +**Fields:** + +- `urn` (str): Unique Resource Name. Must match `^urn:scp:[a-z0-9-]+(:[a-z0-9-]+)?$`. +- `name` (str): Human-readable name. +- `description` (str | None): Detailed description. +- `version` (str | None): Semantic version. +- `classification` (Classification | None): System metadata (tier, domain). + +### Dependency + +Represents a dependency on another system. + +```python +from scp_sdk.core.models import Dependency, RetryConfig + +dep = Dependency( + system="urn:scp:auth-service", + criticality="required", + retry=RetryConfig(max_attempts=3) +) +``` + +**Fields:** + +- `system` (str): URN of the provider system. +- `capability` (str | None): Specific capability being consumed. +- `type` (str): Interaction type (rest, grpc, event, etc.). +- `criticality` (str): Impact of failure ('required', 'degraded', 'optional'). +- `failure_mode` (str | None): Expected failure behavior (fail-fast, circuit-break, etc.). +- `timeout_ms` (int | None): Client-side timeout. +- `retry` (RetryConfig | None): Retry policy. +- `circuit_breaker` (CircuitBreakerConfig | None): Circuit breaker settings. + +### Capability + +Represents a capability provided by the system. + +```python +from scp_sdk.core.models import Capability + +cap = Capability( + capability="process-payment", + type="rest", + description="Process credit card payments" +) +``` + +**Fields:** + +- `capability` (str): Unique name within the system. +- `type` (str): Protocol type. +- `contract` (Contract | None): API contract reference. +- `sla` (SLA | None): Service Level Agreement. +- `x-security` (SecurityExtension | None): Security metadata. + +--- + +## Graph API + +The `Graph` class provides an interface for analyzing system dependencies. + +```python +from scp_sdk import Graph, Manifest + +# Load from manifests +manifests = [Manifest.from_file("scp.yaml")] +graph = Graph.from_manifests(manifests) + +# Or load from export file +graph = Graph.from_file("graph.json") +``` + +### Graph Methods + +#### `find_system(urn: str) -> SystemNode | None` + +Find a system node by its URN. + +#### `dependencies_of(system: SystemNode | str) -> list[DependencyEdge]` + +Get all outbound dependencies (systems that this system depends on). + +#### `dependents_of(system: SystemNode | str) -> list[DependencyEdge]` + +Get all inbound dependencies (systems that depend on this system). This represents the "blast radius" or impact of a failure. + +#### `validate() -> list[ValidationIssue]` + +Check the graph for consistency issues like broken links or cycles. + +### SystemNode + +Simplified view of a system in the graph. + +**Fields:** + +- `urn`: System URN. +- `name`: System name. +- `tier`: System tier (1-5). +- `domain`: Business domain. +- `team`: Owning team. + +### DependencyEdge + +Represents a connection between two systems. + +**Fields:** + +- `from_urn`: Consumer system URN. +- `to_urn`: Provider system URN. +- `criticality`: Dependency criticality. + +--- + +## Export / Import + +Functions for working with the unified JSON graph format. + +### `export_graph_json(manifests: list[SCPManifest]) -> dict` + +Export a list of manifests to the unified graph JSON format. This format is suitable for ingestion by visualization tools or other integrations. + +**Structure:** + +```json +{ + "nodes": [{"id": "...", "type": "System", ...}], + "edges": [{"from": "...", "to": "...", "type": "DEPENDS_ON", ...}], + "meta": {"systems_count": 10, ...} +} +``` + +### `import_graph_json(data: dict) -> list[SCPManifest]` + +Reconstruct `SCPManifest` objects from the graph JSON data. Note that some fidelity may be lost (e.g., specific file paths are not preserved). + +--- + +## Integration Framework + +Base classes for building SCP integrations. + +### IntegrationConfig + +Standard configuration model for all integrations. + +**Fields:** + +- `name` (str): Integration name. +- `auth` (AuthConfig): Credentials. +- `field_mappings` (dict): Custom field mapping. +- `batch_size` (int): Processing batch size. + +### IntegrationBase + +Abstract base class for integrations. Subclasses must implement: + +- `sync_system(system: SystemNode) -> IntegrationResult` +- `sync_dependency(edge: DependencyEdge) -> IntegrationResult` + +--- + +## Testing Utilities + +Helpers for testing SCP-based applications. + +### GraphFixture + +Factory for creating test graphs. + +- `simple_graph()`: Returns a graph with one system. +- `with_dependencies(n)`: Returns a chain of `n` systems. +- `invalid_graph(issue)`: Returns a broken graph for testing validation. + +### CLITestHelper + +Helper for testing Typer CLI commands. + +```python +helper = CLITestHelper(app) +result = helper.run(["sync", "--dry-run"]) +helper.assert_success(result) +``` + +## Utilities + +### TierUtils + +Utilities for SCP tier classifications (1-5). + +- `get_name(tier)`: Convert integer tier to name (Critical, High, etc.). +- `validate_tier(tier)`: Check if an integer is a valid tier. +- `map_tier(tier, mapping)`: Map tier to external system values. diff --git a/docs/examples.md b/docs/examples.md new file mode 100644 index 0000000..22fb5eb --- /dev/null +++ b/docs/examples.md @@ -0,0 +1,108 @@ +# SCP SDK Examples + +Common patterns and examples for using the SCP SDK. + +## Loading a Manifest + +Load a single `scp.yaml` file into a python object. + +```python +from scp_sdk import Manifest + +# Check if file is valid manifest first +try: + manifest = Manifest.from_file("scp.yaml") + print(f"Loaded system: {manifest.name} ({manifest.urn})") +except Exception as e: + print(f"Invalid manifest: {e}") +``` + +## Traversing the Graph + +Load multiple manifests and build an architecture graph. + +```python +from pathlib import Path +from scp_sdk import Manifest, Graph + +# 1. Load all manifests in directory +manifests = [] +for path in Path("./services").rglob("scp.yaml"): + manifests.append(Manifest.from_file(path)) + +# 2. Build graph +graph = Graph.from_manifests(manifests) +print(f"Graph built with {len(graph)} systems") + +# 3. Find specific system +payment = graph.find_system("urn:scp:payment-service") + +if payment: + # 4. List dependencies (Outbound) + print("\nDependencies:") + for edge in graph.dependencies_of(payment): + print(f"- Depends on {edge.to_urn} ({edge.criticality})") + + # 5. List dependents (Inbound / "Blast Radius") + print("\nDependents (Blast Radius):") + for edge in graph.dependents_of(payment): + print(f"- Used by {edge.from_urn}") +``` + +## Validating the Architecture + +Check for missing dependencies or other structural issues. + +```python +from scp_sdk import Graph + +graph = Graph.from_file("graph.json") +issues = graph.validate() + +errors = [i for i in issues if i.severity == "error"] +warnings = [i for i in issues if i.severity == "warning"] + +if errors: + print(f"Found {len(errors)} errors!") + for err in errors: + print(f"[ERROR] {err.message} ({err.code})") + +if warnings: + print(f"Found {len(warnings)} warnings.") +``` + +## Exporting for Visualization + +Export the graph to JSON for use with visualization tools. + +```python +from scp_sdk import Graph, export_graph_json +import json + +# ... build graph ... + +# Export to dictionary +graph_data = graph.to_dict() + +# Save to file +graph.to_json("architecture.json") +``` + +## Creating a Manifest Programmatically + +You can create manifests in code, which is useful for migration scripts. + +```python +from scp_sdk.core.models import SCPManifest, System, Classification + +manifest = SCPManifest( + scp="0.1.0", + system=System( + urn="urn:scp:generated-service", + name="Generated Service", + classification=Classification(tier=2, domain="generated") + ) +) + +print(manifest.to_yaml()) +``` diff --git a/docs/integration-guide.md b/docs/integration-guide.md new file mode 100644 index 0000000..ef413c4 --- /dev/null +++ b/docs/integration-guide.md @@ -0,0 +1,134 @@ +# Building SCP Integrations + +This guide walks you through the process of creating a new integration for the **System Capability Protocol (SCP)** using the `scp-sdk`. + +Integrations allow you to sync your SCP architecture graph with external tools like PagerDuty, ServiceNow, Backstage, or OpsGenie. + +## Overview + +An integration consists of three main parts: + +1. **Registration**: A class decorated with `@register_integration`. +2. **Configuration**: A configuration model inheriting from `IntegrationConfig`. +3. **Implementation**: A class inheriting from `IntegrationBase` that implements `sync_system` and `sync_dependency`. + +## Step 1: Create the Integration Class + +Create a new file for your integration (e.g., `my_integration.py`). + +```python +from typing import Any +from scp_sdk.integrations.base import IntegrationBase, IntegrationResult +from scp_sdk.integrations.registry import register_integration +from scp_sdk.core.graph import SystemNode, DependencyEdge + +@register_integration("my-tool") +class MyToolIntegration(IntegrationBase): + """Integration for My Tool.""" + + def sync_system(self, system: SystemNode) -> IntegrationResult: + # TODO: Implement system sync logic + return IntegrationResult(status="success", id=system.urn) + + def sync_dependency(self, edge: DependencyEdge) -> IntegrationResult: + # TODO: Implement dependency sync logic + return IntegrationResult(status="skipped") +``` + +## Step 2: Implement System Sync + +The `sync_system` method is called for each system in the graph. Your goal is to ensure the system exists in the external tool and is up-to-date. + +```python + def sync_system(self, system: SystemNode) -> IntegrationResult: + try: + # 1. Map SCP fields to vendor fields + payload = { + "name": system.name, + "description": f"Managed by SCP. URN: {system.urn}", + "tier": self._map_tier(system.tier), + } + + # 2. Call vendor API + # external_id = self.client.create_or_update_service(payload) + external_id = "svc-123" # Mocked for example + + return IntegrationResult( + status="success", + id=system.urn, + external_id=external_id + ) + + except Exception as e: + return IntegrationResult( + status="error", + id=system.urn, + message=str(e) + ) + + def _map_tier(self, tier: int | None) -> str: + # Map 1-5 scale to vendor specific values + mapping = {1: "Critical", 2: "High", 3: "Medium"} + return mapping.get(tier, "Low") +``` + +## Step 3: Implement Dependency Sync + +The `sync_dependency` method is called for each dependency edge. + +```python + def sync_dependency(self, edge: DependencyEdge) -> IntegrationResult: + # Only sync if criticality is high enough + if edge.criticality == "optional": + return IntegrationResult(status="skipped", id=f"{edge.from_urn}->{edge.to_urn}") + + # Logic to create link in external tool + return IntegrationResult(status="success", id=f"{edge.from_urn}->{edge.to_urn}") +``` + +## Step 4: Configuration + +Users configure your integration via a YAML file. define the options in `IntegrationConfig` if needed, or use the `custom` dictionary. + +**config.yaml**: + +```yaml +integration: + name: my-tool + auth: + api_key: ${MY_TOOL_API_KEY} + custom: + default_priority: P3 +``` + +Access this in your class via `self.config`. + +```python + def __init__(self, config: IntegrationConfig): + super().__init__(config) + self.api_key = config.auth.api_key + self.priority = config.custom.get("default_priority", "P3") +``` + +## Testing Your Integration + +Use `scp-sdk.testing.fixtures` to create test data. + +```python +from scp_sdk.testing.fixtures import GraphFixture +from my_integration import MyToolIntegration + +def test_sync(): + graph = GraphFixture.simple_graph() + integration = MyToolIntegration(config) + + results = integration.run(graph) + assert not results.failed +``` + +## Best Practices + +1. **Idempotency**: Your sync methods should be idempotent. Running them multiple times should not create duplicate records. +2. **Error Handling**: Catch exceptions and return `IntegrationResult` with `status="error"`. Do not let one failure crash the entire sync process. +3. **Rate Limiting**: Use `BatchProcessor` from `scp_sdk.integrations.utils` if the API has strict rate limits. +4. **Caching**: Use `IDCache` to cache URN-to-VendorID mappings to reduce API references. diff --git a/src/scp_sdk/core/export.py b/src/scp_sdk/core/export.py index ab8c404..a5c21f4 100644 --- a/src/scp_sdk/core/export.py +++ b/src/scp_sdk/core/export.py @@ -32,15 +32,19 @@ def export_graph_json(manifests: list[SCPManifest]) -> dict[str, Any]: Returns: Dictionary with 'nodes', 'edges', and 'meta' keys: - - nodes: List of system and capability nodes - - edges: List of dependency and provides edges - - meta: Counts and statistics + - nodes: List of system and capability nodes. Each node has 'id', 'type', and properties. + - edges: List of dependency and provides edges. Each edge has 'from', 'to', 'type'. + - meta: Counts and statistics (systems_count, capabilities_count, etc). Example: >>> from scp_sdk import Manifest, export_graph_json >>> manifests = [Manifest.from_file("scp.yaml")] >>> graph_data = export_graph_json(manifests) - >>> print(graph_data["meta"]["systems_count"]) + >>> + >>> # Save to file + >>> import json + >>> with open("graph.json", "w") as f: + >>> json.dump(graph_data, f, indent=2) """ nodes: list[dict] = [] edges: list[dict] = [] @@ -139,22 +143,28 @@ def import_graph_json(data: dict[str, Any]) -> list[SCPManifest]: enabling transformation workflows without re-scanning source manifests. Stub nodes (external dependencies) are ignored during reconstruction. + Note: Some data loss is possible if the export format doesn't capture + every field of the original manifest (e.g., complex failure mode thresholds). Args: - data: Dictionary from export_graph_json() output + data: Dictionary from export_graph_json() output, containing 'nodes' and 'edges' Returns: List of reconstructed SCP manifests Raises: - ValueError: If data format is invalid + ValueError: If data format is invalid or missing required fields Example: >>> import json + >>> from scp_sdk import import_graph_json + >>> >>> with open("graph.json") as f: >>> data = json.load(f) + >>> >>> manifests = import_graph_json(data) - >>> print(len(manifests)) + >>> for manifest in manifests: + >>> print(f"Loaded {manifest.system.name}") """ manifests: list[SCPManifest] = [] nodes = data.get("nodes", []) diff --git a/src/scp_sdk/core/graph.py b/src/scp_sdk/core/graph.py index f1d3db2..e203cd4 100644 --- a/src/scp_sdk/core/graph.py +++ b/src/scp_sdk/core/graph.py @@ -11,7 +11,11 @@ @dataclass class SystemNode: - """A system node in the architecture graph.""" + """A system node in the architecture graph. + + Represents a simplified view of a system optimized for graph traversal and analysis. + Derived from the richer System model in the manifest. + """ urn: str name: str @@ -60,7 +64,11 @@ def from_manifest(cls, manifest: Manifest) -> "SystemNode": @dataclass class DependencyEdge: - """A dependency edge in the architecture graph.""" + """A dependency edge in the architecture graph. + + Represents a directed edge from one system (consumer) to another (provider). + Carries metadata about the relationship like criticality and failure modes. + """ from_urn: str to_urn: str @@ -96,12 +104,26 @@ class Graph: """Architecture graph built from SCP manifests. Provides efficient querying and traversal of system dependencies. + Supports loading from unified JSON format or building directly from manifests. Example: + >>> # Load from file >>> graph = Graph.from_file("graph.json") - >>> for system in graph.systems(): - ... print(system.name) - >>> deps = graph.dependencies_of(system) + >>> + >>> # Build from manifests + >>> manifests = [Manifest.from_file("scp.yaml")] + >>> graph = Graph.from_manifests(manifests) + >>> + >>> # Find a system + >>> payment = graph.find_system("urn:scp:payment-service") + >>> + >>> # Get dependencies (outbound edges) + >>> deps = graph.dependencies_of(payment) + >>> for edge in deps: + >>> print(f"Depends on {edge.to_urn} ({edge.criticality})") + >>> + >>> # Get blast radius (inbound edges) + >>> dependents = graph.dependents_of(payment) """ def __init__(self, systems: list[SystemNode], edges: list[DependencyEdge]): @@ -243,7 +265,7 @@ def find_system(self, urn: str) -> SystemNode | None: """Find a system by URN. Args: - urn: System URN to search for + urn: System URN to search for (e.g., "urn:scp:payment-service") Returns: SystemNode if found, None otherwise @@ -253,8 +275,10 @@ def find_system(self, urn: str) -> SystemNode | None: def dependencies_of(self, system: SystemNode | str) -> list[DependencyEdge]: """Get outbound dependencies of a system. + This returns edges where the given system is the CONSUMER (from_urn). + Args: - system: SystemNode or URN string + system: SystemNode object or URN string Returns: List of outbound dependency edges @@ -335,8 +359,11 @@ def validate(self) -> list[ValidationIssue]: def dependents_of(self, system: SystemNode | str) -> list[DependencyEdge]: """Get systems that depend on this system (blast radius). + This returns edges where the given system is the PROVIDER (to_urn). + Useful for analyzing impact of a system failure or change. + Args: - system: SystemNode or URN string + system: SystemNode object or URN string Returns: List of inbound dependency edges diff --git a/src/scp_sdk/core/models.py b/src/scp_sdk/core/models.py index c2bce7a..43a9ee3 100644 --- a/src/scp_sdk/core/models.py +++ b/src/scp_sdk/core/models.py @@ -13,92 +13,198 @@ class Contact(BaseModel): - """Contact channel for a team.""" + """Contact channel for a team. - type: Literal["oncall", "slack", "email", "teams", "pagerduty", "opsgenie"] - ref: str + Defines how to reach a team through various communication channels. + Used in ownership section for incident response and escalation. + """ + + type: Literal["oncall", "slack", "email", "teams", "pagerduty", "opsgenie"] = Field( + description="Type of contact channel (oncall, slack, email, teams, pagerduty, opsgenie)" + ) + ref: str = Field( + description="Reference identifier for the contact (e.g., Slack channel ID, email address, PagerDuty service ID)" + ) class Contract(BaseModel): - """API contract specification reference.""" + """API contract specification reference. + + Points to machine-readable API specifications for capabilities. + Enables automated validation, code generation, and contract testing. + """ type: ( Literal["openapi", "asyncapi", "protobuf", "graphql", "avro", "jsonschema"] | None - ) = None - ref: str | None = None + ) = Field(default=None, description="Type of contract specification format") + ref: str | None = Field( + default=None, + description="URI or path to the contract file (e.g., './openapi.yaml', 'https://api.example.com/spec')", + ) class SLA(BaseModel): - """Service level agreement targets.""" + """Service level agreement targets. - availability: str | None = None # e.g., "99.95%" - latency_p50_ms: int | None = None - latency_p99_ms: int | None = None - throughput_rps: int | None = None + Defines performance and reliability commitments for a capability. + Used for monitoring, alerting, and capacity planning. + """ + + availability: str | None = Field( + default=None, + description="Target availability percentage (e.g., '99.95%', '99.9%')", + ) + latency_p50_ms: int | None = Field( + default=None, + description="Median (50th percentile) latency target in milliseconds", + ) + latency_p99_ms: int | None = Field( + default=None, description="99th percentile latency target in milliseconds" + ) + throughput_rps: int | None = Field( + default=None, description="Target throughput in requests per second" + ) class RetryConfig(BaseModel): - """Retry configuration for dependencies.""" + """Retry configuration for dependencies. + + Defines how a system should retry failed requests to a dependency. + Part of resilience engineering best practices. + """ - max_attempts: int | None = None - backoff: Literal["none", "linear", "exponential"] | None = None + max_attempts: int | None = Field( + default=None, description="Maximum number of retry attempts before giving up" + ) + backoff: Literal["none", "linear", "exponential"] | None = Field( + default=None, + description="Backoff strategy between retries (none, linear, exponential)", + ) class CircuitBreakerConfig(BaseModel): - """Circuit breaker configuration.""" + """Circuit breaker configuration. - failure_threshold: int | None = None - reset_timeout_ms: int | None = None + Implements the circuit breaker pattern to prevent cascading failures. + When failure threshold is met, the circuit opens and fails fast. + """ + + failure_threshold: int | None = Field( + default=None, + description="Number of consecutive failures before opening the circuit", + ) + reset_timeout_ms: int | None = Field( + default=None, + description="Time in milliseconds to wait before attempting to close the circuit", + ) class SecurityConstraints(BaseModel): - """Security-related constraints.""" + """Security-related constraints. + + Defines security requirements and configurations for the system. + Used for compliance validation and security posture assessment. + """ - authentication: list[str] | None = None - data_classification: str | None = None - encryption: dict[str, bool] | None = None # at_rest, in_transit + authentication: list[str] | None = Field( + default=None, + description="Required authentication methods (e.g., ['oauth2', 'mTLS', 'api-key'])", + ) + data_classification: str | None = Field( + default=None, + description="Data classification level (e.g., 'public', 'internal', 'confidential', 'restricted')", + ) + encryption: dict[str, bool] | None = Field( + default=None, + description="Encryption requirements with keys 'at_rest' and 'in_transit'", + ) class ComplianceConstraints(BaseModel): - """Compliance-related constraints.""" + """Compliance-related constraints. - frameworks: list[str] | None = None - data_residency: list[str] | None = None - retention_days: int | None = None + Defines regulatory and compliance requirements. + Used for audit trails and compliance reporting. + """ + + frameworks: list[str] | None = Field( + default=None, + description="Applicable compliance frameworks (e.g., ['SOC2', 'HIPAA', 'GDPR', 'PCI-DSS'])", + ) + data_residency: list[str] | None = Field( + default=None, + description="Required data residency regions (e.g., ['US', 'EU', 'APAC'])", + ) + retention_days: int | None = Field( + default=None, description="Data retention period in days" + ) class OperationalConstraints(BaseModel): - """Operational constraints.""" + """Operational constraints. + + Defines operational requirements and limitations. + Used for capacity planning and deployment automation. + """ - max_replicas: int | None = None - min_replicas: int | None = None - deployment_windows: list[str] | None = None + max_replicas: int | None = Field( + default=None, description="Maximum number of instances/pods allowed" + ) + min_replicas: int | None = Field( + default=None, description="Minimum number of instances/pods required" + ) + deployment_windows: list[str] | None = Field( + default=None, + description="Allowed deployment time windows (e.g., ['mon-fri 02:00-04:00 UTC'])", + ) class KubernetesRuntime(BaseModel): - """Kubernetes deployment info.""" + """Kubernetes deployment information. - namespace: str | None = None - deployment: str | None = None - service: str | None = None + Identifies Kubernetes resources for the system. + Used for runtime discovery and monitoring integration. + """ + + namespace: str | None = Field( + default=None, description="Kubernetes namespace where the system is deployed" + ) + deployment: str | None = Field( + default=None, description="Name of the Kubernetes Deployment resource" + ) + service: str | None = Field( + default=None, description="Name of the Kubernetes Service resource" + ) class AWSRuntime(BaseModel): - """AWS deployment info.""" + """AWS deployment information. + + Identifies AWS resources for the system. + Used for cloud resource discovery and cost allocation. + """ - account_id: str | None = None - region: str | None = None - arn: str | None = None + account_id: str | None = Field(default=None, description="AWS Account ID") + region: str | None = Field( + default=None, description="AWS Region (e.g., 'us-west-2')" + ) + arn: str | None = Field(default=None, description="Resource ARN") class Environment(BaseModel): """Runtime environment configuration.""" - otel_service_name: str | None = None - endpoints: list[str] | None = None - kubernetes: KubernetesRuntime | None = None - aws: AWSRuntime | None = None + otel_service_name: str | None = Field( + default=None, description="OpenTelemetry service name for this environment" + ) + endpoints: list[str] | None = Field( + default=None, description="Public or internal endpoints (URLs)" + ) + kubernetes: KubernetesRuntime | None = Field( + default=None, description="Kubernetes deployment details" + ) + aws: AWSRuntime | None = Field(default=None, description="AWS deployment details") class FailureModeThresholds(BaseModel): @@ -115,9 +221,16 @@ class SecurityExtension(BaseModel): enabling SOAR autodiscovery of security controls. """ - actuator_profile: str | None = None # e.g., "edr", "siem", "slpf" - actions: list[str] = [] # e.g., "query", "contain", "deny" - targets: list[str] = [] # e.g., "device", "ipv4_net", "file" + actuator_profile: str | None = Field( + default=None, + description="Security actuator profile (e.g., 'edr', 'siem', 'slpf')", + ) + actions: list[str] = Field( + default=[], description="Supported actions (e.g., 'query', 'contain', 'deny')" + ) + targets: list[str] = Field( + default=[], description="Supported targets (e.g., 'device', 'ipv4_net', 'file')" + ) # ============================================================================ @@ -126,92 +239,225 @@ class SecurityExtension(BaseModel): class Classification(BaseModel): - """System classification metadata.""" + """System classification metadata. - tier: int | None = Field(None, ge=1, le=5) - domain: str | None = None - tags: list[str] | None = None + Categorizes systems by criticality,domain, and other attributes. + Used for prioritization, resource allocation, and impact analysis. + """ + + tier: int | None = Field( + default=None, + ge=1, + le=5, + description="Criticality tier (1-5), where 1 is most critical", + ) + domain: str | None = Field( + default=None, + description="Domain or bounded context (e.g., 'payments', 'identity')", + ) + tags: list[str] | None = Field( + default=None, description="Arbitrary labels for filtering and categorization" + ) class System(BaseModel): """Core system identification.""" - urn: str = Field(..., pattern=r"^urn:scp:[a-z0-9-]+(:[a-z0-9-]+)?$") - name: str - description: str | None = None - version: str | None = None - classification: Classification | None = None + urn: str = Field( + ..., + pattern=r"^urn:scp:[a-z0-9-]+(:[a-z0-9-]+)?$", + description="Unique resource name following pattern 'urn:scp:service-name' or 'urn:scp:namespace:service-name'", + ) + name: str = Field(description="Human-readable system name") + description: str | None = Field( + default=None, + description="Brief description of the system's purpose and functionality", + ) + version: str | None = Field( + default=None, description="System version (e.g., '1.0.0', '2.3.1-beta')" + ) + classification: Classification | None = Field( + default=None, + description="Classification metadata including tier, domain, and tags", + ) class Ownership(BaseModel): - """Team ownership and contact information.""" + """Team ownership and contact information. + + Defines who is responsible for the system and how to reach them. + Critical for incident response and operational excellence. + """ - team: str - contacts: list[Contact] | None = None - escalation: list[str] | None = None + team: str = Field(description="Team name or identifier responsible for the system") + contacts: list[Contact] | None = Field( + default=None, + description="Contact channels for reaching the team (Slack, PagerDuty, email, etc.)", + ) + escalation: list[str] | None = Field( + default=None, description="Escalation chain, ordered list of contacts or teams" + ) class Capability(BaseModel): - """A capability provided by the system.""" + """A capability provided by the system. + + Represents a service, API, event stream, or data store that the system provides. + Each capability can have contracts, SLAs, and security metadata. + + Example: + >>> capability = Capability( + ... capability="process-payment", + ... type="rest", + ... contract=Contract(type="openapi", ref="./openapi.yaml"), + ... sla=SLA(availability="99.95%", latency_p99_ms=500) + ... ) + """ model_config = {"populate_by_name": True} - capability: str - type: Literal["rest", "grpc", "graphql", "event", "data", "stream"] - contract: Contract | None = None - sla: SLA | None = None - topics: list[str] | None = None # For event types - x_security: SecurityExtension | None = Field(None, alias="x-security") + capability: str = Field( + description="Unique capability identifier (e.g., 'user-authentication', 'payment-processing')" + ) + type: Literal["rest", "grpc", "graphql", "event", "data", "stream"] = Field( + description="Type of capability: rest (HTTP API), grpc, graphql, event (pub/sub), data (database), or stream" + ) + contract: Contract | None = Field( + default=None, description="Machine-readable API contract specification" + ) + sla: SLA | None = Field( + default=None, description="Service level agreement targets for this capability" + ) + topics: list[str] | None = Field( + default=None, + description="Event topics or data streams (relevant for 'event' and 'stream' types)", + ) + x_security: SecurityExtension | None = Field( + default=None, + alias="x-security", + description="Security capability metadata for SOAR autodiscovery (OpenC2-inspired)", + ) class Dependency(BaseModel): - """A dependency on another system.""" + """A dependency on another system. + + Defines a relationship where this system relies on another system's capability. + Includes resilience patterns (retry, circuit breaker) and criticality. + + Example: + >>> dep = Dependency( + ... system="urn:scp:auth-service", + ... capability="verify-token", + ... type="rest", + ... criticality="required", + ... timeout_ms=500, + ... retry=RetryConfig(max_attempts=3, backoff="exponential") + ... ) + """ - system: str = Field(..., pattern=r"^urn:scp:[a-z0-9-]+(:[a-z0-9-]+)?$") - capability: str | None = None - type: Literal["rest", "grpc", "graphql", "event", "data", "stream"] - criticality: Literal["required", "degraded", "optional"] + system: str = Field( + ..., + pattern=r"^urn:scp:[a-z0-9-]+(:[a-z0-9-]+)?$", + description="URN of the system being depended on", + ) + capability: str | None = Field( + default=None, + description="Specific capability being consumed (optional but recommended)", + ) + type: Literal["rest", "grpc", "graphql", "event", "data", "stream"] = Field( + description="Type of interaction (must match the provided capability type)" + ) + criticality: Literal["required", "degraded", "optional"] = Field( + description="Impact if this dependency fails (required=outage, degraded=reduced functionality, optional=no impact)" + ) failure_mode: ( Literal["fail-fast", "circuit-break", "fallback", "queue-buffer", "retry"] | None - ) = None - timeout_ms: int | None = None - retry: RetryConfig | None = None - circuit_breaker: CircuitBreakerConfig | None = None - topics: list[str] | None = None - access: Literal["read", "write", "read-write"] | None = None + ) = Field(default=None, description="Expected behavior when dependency fails") + timeout_ms: int | None = Field( + default=None, description="Client-side timeout in milliseconds" + ) + retry: RetryConfig | None = Field( + default=None, description="Retry policy configuration" + ) + circuit_breaker: CircuitBreakerConfig | None = Field( + default=None, description="Circuit breaker configuration" + ) + topics: list[str] | None = Field( + default=None, + description="Specific topics or streams consumed (for event/stream types)", + ) + access: Literal["read", "write", "read-write"] | None = Field( + default=None, + description="Data access level (mainly for data/database dependencies)", + ) class Constraints(BaseModel): - """System constraints.""" + """System constraints. + + Groups all constraint-related configurations covering security, compliance, + and operational requirements. + """ - security: SecurityConstraints | None = None - compliance: ComplianceConstraints | None = None - operational: OperationalConstraints | None = None + security: SecurityConstraints | None = Field( + default=None, description="Security constraints and requirements" + ) + compliance: ComplianceConstraints | None = Field( + default=None, description="Compliance and regulatory requirements" + ) + operational: OperationalConstraints | None = Field( + default=None, description="Operational limits and requirements" + ) class Runtime(BaseModel): - """Runtime environment configurations.""" + """Runtime environment configurations. - environments: dict[str, Environment] | None = None + Maps deployment environments (e.g., 'production', 'staging') to their + specific configurations. + """ + + environments: dict[str, Environment] | None = Field( + default=None, + description="Dictionary mapping environment names to their configurations", + ) class FailureMode(BaseModel): - """Known failure mode and its characteristics.""" + """Known failure mode and its characteristics. + + Documents specific ways the system might fail and the expected impact. + Essential for failure mode and effects analysis (FMEA) and game days. + """ - mode: str + mode: str = Field( + description="Name or title of the failure mode (e.g., 'database-latency-spike', 'cache-miss-storm')" + ) impact: Literal[ "total-outage", "partial-outage", "degraded-experience", "data-inconsistency", "silent-failure", - ] - detection: str | None = None - recovery: str | None = None - degraded_behavior: str | None = None - mttr_target_minutes: int | None = None - thresholds: FailureModeThresholds | None = None + ] = Field(description="Severity of the impact on the system's users") + detection: str | None = Field( + default=None, description="How this failure is detected (alerts, metrics, logs)" + ) + recovery: str | None = Field( + default=None, description="Automated or manual recovery steps" + ) + degraded_behavior: str | None = Field( + default=None, + description="Description of how the system behaves while in this failure mode", + ) + mttr_target_minutes: int | None = Field( + default=None, description="Target Mean Time To Recovery in minutes" + ) + thresholds: FailureModeThresholds | None = Field( + default=None, description="Metric thresholds defining this failure mode" + ) # ============================================================================ @@ -223,16 +469,30 @@ class SCPManifest(BaseModel): """Root SCP manifest model. This represents a complete scp.yaml file. + It aggregates all information about a system into a single document. """ - scp: str # Version, e.g., "0.1.0" - system: System - ownership: Ownership | None = None - provides: list[Capability] | None = None - depends: list[Dependency] | None = None - constraints: Constraints | None = None - runtime: Runtime | None = None - failure_modes: list[FailureMode] | None = None + scp: str = Field(description="SCP schema version (e.g., '0.1.0')") + system: System = Field(description="Core system identification") + ownership: Ownership | None = Field( + default=None, description="Team ownership and contacts" + ) + provides: list[Capability] | None = Field( + default=None, description="List of capabilities provided by this system" + ) + depends: list[Dependency] | None = Field( + default=None, description="List of dependencies on other systems" + ) + constraints: Constraints | None = Field( + default=None, + description="System constraints (security, compliance, operational)", + ) + runtime: Runtime | None = Field( + default=None, description="Runtime environment configurations" + ) + failure_modes: list[FailureMode] | None = Field( + default=None, description="Known failure modes and analysis" + ) @property def urn(self) -> str: @@ -258,9 +518,17 @@ class ValidationIssue(BaseModel): """Graph validation issue. Used by Graph.validate() to report structural or semantic problems. + Can represent errors (broken graph), warnings, or informational notices. """ - severity: Literal["error", "warning", "info"] - code: str # e.g., "MISSING_DEPENDENCY_TARGET" - message: str - context: dict[str, str] = {} # URN, edge details, etc. + severity: Literal["error", "warning", "info"] = Field( + description="Severity level of the issue" + ) + code: str = Field( + description="Unique error code (e.g., 'MISSING_DEPENDENCY_TARGET')" + ) + message: str = Field(description="Human-readable description of the issue") + context: dict[str, str] = Field( + default={}, + description="Additional context for debugging (e.g., affected URNs, edge details)", + ) diff --git a/src/scp_sdk/integrations/config.py b/src/scp_sdk/integrations/config.py index 0451a46..2a8fbcc 100644 --- a/src/scp_sdk/integrations/config.py +++ b/src/scp_sdk/integrations/config.py @@ -10,7 +10,20 @@ class AuthConfig(BaseModel): - """Authentication configuration.""" + """Authentication configuration. + + Flexible schema supporting various authentication methods. + Only fields relevant to the specific integration need to be populated. + + Example: + >>> auth = AuthConfig(api_key="secret-key") + >>> # or + >>> auth = AuthConfig( + ... client_id="id", + ... client_secret="secret", + ... token="oauth-token" + ... ) + """ api_key: str | None = None username: str | None = None @@ -21,16 +34,34 @@ class AuthConfig(BaseModel): class IntegrationConfig(BaseModel): - """Standard configuration for integrations.""" + """Standard configuration for integrations. + + Provides a consistent configuration structure across all integrations. + Supports field mapping, authentication, and resilience settings. + + Example: + >>> config = IntegrationConfig( + ... name="pagerduty", + ... auth=AuthConfig(api_key="..."), + ... batch_size=50, + ... custom={"default_severity": "critical"} + ... ) + """ - name: str - vendor: str | None = None - field_mappings: dict[str, Any] = Field(default_factory=dict) - auth: AuthConfig | None = None - batch_size: int = 100 - timeout_seconds: int = 30 - retry_attempts: int = 3 - custom: dict[str, Any] = Field(default_factory=dict) + name: str = Field(description="Integration name (must match registered name)") + vendor: str | None = Field( + None, description="Optional vendor name if different from integration name" + ) + field_mappings: dict[str, Any] = Field( + default_factory=dict, description="Map of vendor fields to SCP fields" + ) + auth: AuthConfig | None = Field(None, description="Authentication credentials") + batch_size: int = Field(100, description="Items to process per batch") + timeout_seconds: int = Field(30, description="API timeout in seconds") + retry_attempts: int = Field(3, description="Number of retries for failed requests") + custom: dict[str, Any] = Field( + default_factory=dict, description="Integration-specific custom configuration" + ) def substitute_env_vars(data: Any) -> Any: @@ -88,11 +119,11 @@ def load_config(path: Path | str) -> IntegrationConfig: # Extract integration section if present if "integration" in data: config_data = data["integration"] - + # Move auth if at root level if "auth" in data and "auth" not in config_data: config_data["auth"] = data["auth"] - + # Move field_mappings if at root level if "field_mappings" in data and "field_mappings" not in config_data: config_data["field_mappings"] = data["field_mappings"] diff --git a/src/scp_sdk/integrations/registry.py b/src/scp_sdk/integrations/registry.py index d8671f2..25f9591 100644 --- a/src/scp_sdk/integrations/registry.py +++ b/src/scp_sdk/integrations/registry.py @@ -11,16 +11,19 @@ def register_integration(name: str) -> Callable: """Decorator to register an integration plugin. - Example: - @register_integration("pagerduty") - class PagerDutyIntegration(IntegrationBase): - ... + This decorator adds the decorated class to the global integration registry, + allowing it to be instantiated by name via `get_integration()`. Args: - name: Integration name/identifier + name: Unique identifier for the integration (e.g., "pagerduty", "servicenow") Returns: - Decorator function + Decorator function that registers the class + + Example: + >>> @register_integration("my-integration") + >>> class MyIntegration(IntegrationBase): + >>> ... """ def decorator(cls: type) -> type: @@ -36,10 +39,15 @@ def get_integration(name: str) -> type | None: """Get a registered integration class by name. Args: - name: Integration name + name: Integration name to look up Returns: - Integration class or None if not found + Integration class if found, None otherwise + + Example: + >>> cls = get_integration("pagerduty") + >>> if cls: + >>> integration = cls(config) """ return _REGISTRY.get(name) @@ -47,12 +55,19 @@ def get_integration(name: str) -> type | None: def list_integrations() -> list[str]: """List all registered integration names. + Useful for discovery (e.g., showing available integrations in a CLI). + The list is sorted to ensure deterministic output. + Returns: - List of integration names + List of registered integration names (sorted) """ return list(_REGISTRY.keys()) def clear_registry() -> None: - """Clear all registered integrations (mainly for testing).""" + """Clear all registered integrations. + + WARNING: This removes all integrations from the internal registry. + Primarily used for testing isolation to ensure a clean state between tests. + """ _REGISTRY.clear() diff --git a/src/scp_sdk/integrations/utils.py b/src/scp_sdk/integrations/utils.py index 449c8b6..48a3fa0 100644 --- a/src/scp_sdk/integrations/utils.py +++ b/src/scp_sdk/integrations/utils.py @@ -5,7 +5,19 @@ class FieldMapper: - """Maps SCP fields to vendor-specific fields.""" + """Maps SCP fields to vendor-specific fields. + + Handles translation between the standardized SCP schema and vendor-specific + data models. Supports direct mapping and basic nesting. + + Example: + >>> mapping = { + ... "vendor_name": "system.name", + ... "vendor_tier": "system.classification.tier" + ... } + >>> mapper = FieldMapper(mapping) + >>> result = mapper.map_fields(scp_data) + """ def __init__(self, mapping: dict[str, str | list[str]]): """Initialize with field mapping configuration. @@ -13,7 +25,7 @@ def __init__(self, mapping: dict[str, str | list[str]]): Args: mapping: Dictionary mapping vendor fields to SCP fields Values can be: - - string: direct SCP field name + - string: dot-notation path to SCP field (e.g. "system.name") - list: multiple SCP fields (combined strategy varies) """ self.mapping = mapping @@ -63,7 +75,19 @@ def extract_value(self, scp_data: dict[str, Any], path: str) -> Any: class IDCache: - """Cache URN to vendor ID mappings.""" + """Cache URN to vendor ID mappings. + + Minimizes API calls by caching resolved vendor IDs. + Essential for performance when syncing large graphs. + + Example: + >>> cache = IDCache() + >>> # Get with fetch fallback + >>> vendor_id = cache.get_or_fetch( + ... urn="urn:scp:svc", + ... fetch_fn=lambda u: api.lookup(u) + ... ) + """ def __init__(self): """Initialize empty cache.""" @@ -89,7 +113,9 @@ def set(self, urn: str, vendor_id: str) -> None: """ self._cache[urn] = vendor_id - def get_or_fetch(self, urn: str, fetch_fn: Callable[[str], str | None]) -> str | None: + def get_or_fetch( + self, urn: str, fetch_fn: Callable[[str], str | None] + ) -> str | None: """Get cached ID or fetch if not cached. Args: @@ -114,7 +140,15 @@ def clear(self) -> None: class BatchProcessor: - """Process items in batches with rate limiting.""" + """Process items in batches with rate limiting. + + Utility to handle API rate limits and optimize throughput. + Groups items into chunks and optionally sleeps between chunks. + + Example: + >>> processor = BatchProcessor(batch_size=50, delay_seconds=0.5) + >>> processor.process(items, push_to_api) + """ def __init__(self, batch_size: int = 100, delay_seconds: float = 0): """Initialize batch processor. @@ -147,7 +181,19 @@ def process( class CommentBuilder: - """Build formatted comment/description fields.""" + """Build formatted comment/description fields. + + Generates standardized metadata blocks for vendor description fields. + Useful for ensuring SCP owners and contact info appear in vendor tools. + + Example: + >>> builder = CommentBuilder() + >>> description = builder.build(system_data) + >>> # Result: + >>> # SCP Metadata: + >>> # Team: Checkout + >>> # Domain: Payments + """ def __init__(self, template: str | None = None): """Initialize with optional template. diff --git a/src/scp_sdk/testing/cli.py b/src/scp_sdk/testing/cli.py index 601185d..1f0480e 100644 --- a/src/scp_sdk/testing/cli.py +++ b/src/scp_sdk/testing/cli.py @@ -23,14 +23,17 @@ class CLITestHelper: """Helper for testing Typer CLI applications. Provides convenient methods for running CLI commands with mocked - dependencies and temporary files. + dependencies and temporary files. Wraps `typer.testing.CliRunner`. Example: - >>> from typer import Typer - >>> app = Typer() + >>> app = typer.Typer() + >>> @app.command() + >>> def hello(name: str): + >>> print(f"Hello {name}") + >>> >>> helper = CLITestHelper(app) - >>> result = helper.run(["command", "--flag"]) - >>> assert result.exit_code == 0 + >>> result = helper.run(["hello", "World"]) + >>> helper.assert_success(result, "Hello World") """ def __init__(self, app: Any): diff --git a/src/scp_sdk/testing/fixtures.py b/src/scp_sdk/testing/fixtures.py index f392390..bd0b6ba 100644 --- a/src/scp_sdk/testing/fixtures.py +++ b/src/scp_sdk/testing/fixtures.py @@ -21,12 +21,17 @@ class GraphFixture: """Factory for test graphs. + Provides pre-built graph structures for testing graph algorithms and validation. + Example: + >>> # Valid simple graph >>> graph = GraphFixture.simple_graph() - >>> assert len(graph) == 1 >>> + >>> # Graph with 3 systems in a dependency chain >>> graph = GraphFixture.with_dependencies(3) - >>> assert len(list(graph.dependencies())) > 0 + >>> + >>> # Graph with explicit validation issue + >>> graph = GraphFixture.invalid_graph("broken_dependency") """ @staticmethod From 236a66ff2f5181314d5d4cdcd69164d4185e5353 Mon Sep 17 00:00:00 2001 From: Ryan McLean Date: Fri, 16 Jan 2026 15:29:10 +0000 Subject: [PATCH 2/2] Ensure Export exports all data and not just a subset --- src/scp_sdk/core/export.py | 327 ++++++++++++++++++++++++-- tests/test_export.py | 468 ++++++++++++++++++++++++++++++++++++- 2 files changed, 776 insertions(+), 19 deletions(-) diff --git a/src/scp_sdk/core/export.py b/src/scp_sdk/core/export.py index a5c21f4..db2b28b 100644 --- a/src/scp_sdk/core/export.py +++ b/src/scp_sdk/core/export.py @@ -15,15 +15,145 @@ Capability, Dependency, SecurityExtension, + Contract, + SLA, + RetryConfig, + CircuitBreakerConfig, + Constraints, + SecurityConstraints, + ComplianceConstraints, + OperationalConstraints, + Runtime, + Environment, + KubernetesRuntime, + AWSRuntime, + FailureMode, + FailureModeThresholds, ) +def _serialize_contract(contract: Contract | None) -> dict[str, Any] | None: + """Serialize Contract to dict.""" + if not contract: + return None + return {"type": contract.type, "ref": contract.ref} + + +def _serialize_sla(sla: SLA | None) -> dict[str, Any] | None: + """Serialize SLA to dict.""" + if not sla: + return None + return { + "availability": sla.availability, + "latency_p50_ms": sla.latency_p50_ms, + "latency_p99_ms": sla.latency_p99_ms, + "throughput_rps": sla.throughput_rps, + } + + +def _serialize_retry(retry: RetryConfig | None) -> dict[str, Any] | None: + """Serialize RetryConfig to dict.""" + if not retry: + return None + return {"max_attempts": retry.max_attempts, "backoff": retry.backoff} + + +def _serialize_circuit_breaker( + cb: CircuitBreakerConfig | None, +) -> dict[str, Any] | None: + """Serialize CircuitBreakerConfig to dict.""" + if not cb: + return None + return { + "failure_threshold": cb.failure_threshold, + "reset_timeout_ms": cb.reset_timeout_ms, + } + + +def _serialize_constraints(constraints: Constraints | None) -> dict[str, Any] | None: + """Serialize Constraints to dict.""" + if not constraints: + return None + result: dict[str, Any] = {} + if constraints.security: + result["security"] = { + "authentication": constraints.security.authentication, + "data_classification": constraints.security.data_classification, + "encryption": constraints.security.encryption, + } + if constraints.compliance: + result["compliance"] = { + "frameworks": constraints.compliance.frameworks, + "data_residency": constraints.compliance.data_residency, + "retention_days": constraints.compliance.retention_days, + } + if constraints.operational: + result["operational"] = { + "max_replicas": constraints.operational.max_replicas, + "min_replicas": constraints.operational.min_replicas, + "deployment_windows": constraints.operational.deployment_windows, + } + return result if result else None + + +def _serialize_runtime(runtime: Runtime | None) -> dict[str, Any] | None: + """Serialize Runtime to dict.""" + if not runtime or not runtime.environments: + return None + envs: dict[str, Any] = {} + for name, env in runtime.environments.items(): + env_data: dict[str, Any] = { + "otel_service_name": env.otel_service_name, + "endpoints": env.endpoints, + } + if env.kubernetes: + env_data["kubernetes"] = { + "namespace": env.kubernetes.namespace, + "deployment": env.kubernetes.deployment, + "service": env.kubernetes.service, + } + if env.aws: + env_data["aws"] = { + "account_id": env.aws.account_id, + "region": env.aws.region, + "arn": env.aws.arn, + } + envs[name] = env_data + return {"environments": envs} + + +def _serialize_failure_modes( + failure_modes: list[FailureMode] | None, +) -> list[dict[str, Any]] | None: + """Serialize FailureModes to list of dicts.""" + if not failure_modes: + return None + result = [] + for fm in failure_modes: + fm_data: dict[str, Any] = { + "mode": fm.mode, + "impact": fm.impact, + "detection": fm.detection, + "recovery": fm.recovery, + "degraded_behavior": fm.degraded_behavior, + "mttr_target_minutes": fm.mttr_target_minutes, + } + if fm.thresholds: + fm_data["thresholds"] = { + "warning_ms": fm.thresholds.warning_ms, + "critical_ms": fm.thresholds.critical_ms, + } + result.append(fm_data) + return result + + def export_graph_json(manifests: list[SCPManifest]) -> dict[str, Any]: """Export manifests to unified JSON graph format. Creates a standardized graph representation with nodes and edges that can be consumed by integrations, visualizations, and analysis tools. + This export includes ALL manifest data for complete roundtrip fidelity. System nodes are deduplicated (last wins), and stub nodes are created for external dependencies. @@ -32,9 +162,9 @@ def export_graph_json(manifests: list[SCPManifest]) -> dict[str, Any]: Returns: Dictionary with 'nodes', 'edges', and 'meta' keys: - - nodes: List of system and capability nodes. Each node has 'id', 'type', and properties. - - edges: List of dependency and provides edges. Each edge has 'from', 'to', 'type'. - - meta: Counts and statistics (systems_count, capabilities_count, etc). + - nodes: List of system and capability nodes with full data + - edges: List of dependency and provides edges with resilience config + - meta: Counts and statistics Example: >>> from scp_sdk import Manifest, export_graph_json @@ -44,7 +174,7 @@ def export_graph_json(manifests: list[SCPManifest]) -> dict[str, Any]: >>> # Save to file >>> import json >>> with open("graph.json", "w") as f: - >>> json.dump(graph_data, f, indent=2) + ... json.dump(graph_data, f, indent=2) """ nodes: list[dict] = [] edges: list[dict] = [] @@ -53,17 +183,25 @@ def export_graph_json(manifests: list[SCPManifest]) -> dict[str, Any]: for manifest in manifests: urn = manifest.system.urn - # Add or update system node (replaces stub if exists) - system_nodes[urn] = { + # Build complete system node with ALL fields + system_node: dict[str, Any] = { "id": urn, "type": "System", "name": manifest.system.name, + "description": manifest.system.description, + "version": manifest.system.version, + "scp_version": manifest.scp, + # Classification fields "tier": manifest.system.classification.tier if manifest.system.classification else None, "domain": manifest.system.classification.domain if manifest.system.classification else None, + "tags": manifest.system.classification.tags + if manifest.system.classification + else None, + # Ownership fields "team": manifest.ownership.team if manifest.ownership else None, "contacts": [ {"type": c.type, "ref": c.ref} for c in manifest.ownership.contacts @@ -71,9 +209,14 @@ def export_graph_json(manifests: list[SCPManifest]) -> dict[str, Any]: if manifest.ownership and manifest.ownership.contacts else [], "escalation": manifest.ownership.escalation if manifest.ownership else [], + # Complex sections + "constraints": _serialize_constraints(manifest.constraints), + "runtime": _serialize_runtime(manifest.runtime), + "failure_modes": _serialize_failure_modes(manifest.failure_modes), } + system_nodes[urn] = system_node - # Add dependency edges (create stub only if not already known) + # Add dependency edges with full resilience config if manifest.depends: for dep in manifest.depends: # Create stub node for dependency target if not seen @@ -91,12 +234,20 @@ def export_graph_json(manifests: list[SCPManifest]) -> dict[str, Any]: "to": dep.system, "type": "DEPENDS_ON", "capability": dep.capability, + "dependency_type": dep.type, "criticality": dep.criticality, "failure_mode": dep.failure_mode, + "timeout_ms": dep.timeout_ms, + "retry": _serialize_retry(dep.retry), + "circuit_breaker": _serialize_circuit_breaker( + dep.circuit_breaker + ), + "topics": dep.topics, + "access": dep.access, } ) - # Add capability nodes and PROVIDES edges + # Add capability nodes with full contract/SLA data and PROVIDES edges if manifest.provides: for cap in manifest.provides: cap_id = f"{urn}:{cap.capability}" @@ -105,6 +256,9 @@ def export_graph_json(manifests: list[SCPManifest]) -> dict[str, Any]: "type": "Capability", "name": cap.capability, "capability_type": cap.type, + "contract": _serialize_contract(cap.contract), + "sla": _serialize_sla(cap.sla), + "topics": cap.topics, } # Include security extension if present if cap.x_security: @@ -143,8 +297,7 @@ def import_graph_json(data: dict[str, Any]) -> list[SCPManifest]: enabling transformation workflows without re-scanning source manifests. Stub nodes (external dependencies) are ignored during reconstruction. - Note: Some data loss is possible if the export format doesn't capture - every field of the original manifest (e.g., complex failure mode thresholds). + This import restores ALL fields that were exported for complete roundtrip fidelity. Args: data: Dictionary from export_graph_json() output, containing 'nodes' and 'edges' @@ -160,11 +313,11 @@ def import_graph_json(data: dict[str, Any]) -> list[SCPManifest]: >>> from scp_sdk import import_graph_json >>> >>> with open("graph.json") as f: - >>> data = json.load(f) + ... data = json.load(f) >>> >>> manifests = import_graph_json(data) >>> for manifest in manifests: - >>> print(f"Loaded {manifest.system.name}") + ... print(f"Loaded {manifest.system.name}") """ manifests: list[SCPManifest] = [] nodes = data.get("nodes", []) @@ -190,10 +343,11 @@ def import_graph_json(data: dict[str, Any]) -> list[SCPManifest]: for urn, node in system_nodes.items(): # Build classification classification = None - if node.get("tier") or node.get("domain"): + if node.get("tier") or node.get("domain") or node.get("tags"): classification = Classification( tier=node.get("tier"), domain=node.get("domain"), + tags=node.get("tags"), ) # Build ownership @@ -210,12 +364,101 @@ def import_graph_json(data: dict[str, Any]) -> list[SCPManifest]: escalation=node.get("escalation"), ) + # Build constraints + constraints = None + if node.get("constraints"): + c = node["constraints"] + security = None + compliance = None + operational = None + + if c.get("security"): + s = c["security"] + security = SecurityConstraints( + authentication=s.get("authentication"), + data_classification=s.get("data_classification"), + encryption=s.get("encryption"), + ) + if c.get("compliance"): + comp = c["compliance"] + compliance = ComplianceConstraints( + frameworks=comp.get("frameworks"), + data_residency=comp.get("data_residency"), + retention_days=comp.get("retention_days"), + ) + if c.get("operational"): + op = c["operational"] + operational = OperationalConstraints( + max_replicas=op.get("max_replicas"), + min_replicas=op.get("min_replicas"), + deployment_windows=op.get("deployment_windows"), + ) + + if security or compliance or operational: + constraints = Constraints( + security=security, + compliance=compliance, + operational=operational, + ) + + # Build runtime + runtime = None + if node.get("runtime") and node["runtime"].get("environments"): + envs: dict[str, Environment] = {} + for env_name, env_data in node["runtime"]["environments"].items(): + k8s = None + aws = None + if env_data.get("kubernetes"): + k = env_data["kubernetes"] + k8s = KubernetesRuntime( + namespace=k.get("namespace"), + deployment=k.get("deployment"), + service=k.get("service"), + ) + if env_data.get("aws"): + a = env_data["aws"] + aws = AWSRuntime( + account_id=a.get("account_id"), + region=a.get("region"), + arn=a.get("arn"), + ) + envs[env_name] = Environment( + otel_service_name=env_data.get("otel_service_name"), + endpoints=env_data.get("endpoints"), + kubernetes=k8s, + aws=aws, + ) + runtime = Runtime(environments=envs) + + # Build failure modes + failure_modes = None + if node.get("failure_modes"): + failure_modes = [] + for fm in node["failure_modes"]: + thresholds = None + if fm.get("thresholds"): + thresholds = FailureModeThresholds( + warning_ms=fm["thresholds"].get("warning_ms"), + critical_ms=fm["thresholds"].get("critical_ms"), + ) + failure_modes.append( + FailureMode( + mode=fm["mode"], + impact=fm["impact"], + detection=fm.get("detection"), + recovery=fm.get("recovery"), + degraded_behavior=fm.get("degraded_behavior"), + mttr_target_minutes=fm.get("mttr_target_minutes"), + thresholds=thresholds, + ) + ) + # Build capabilities provides = [] for edge in provides_by_system.get(urn, []): cap_node = capability_nodes.get(edge["to"]) if cap_node: - # Check for security extension in capability node + # Rebuild security extension if present x_security = None if cap_node.get("x_security"): sec = cap_node["x_security"] @@ -225,37 +468,87 @@ def import_graph_json(data: dict[str, Any]) -> list[SCPManifest]: targets=sec.get("targets", []), ) - cap_data = { + # Rebuild contract if present + contract = None + if cap_node.get("contract"): + cont = cap_node["contract"] + contract = Contract(type=cont.get("type"), ref=cont.get("ref")) + + # Rebuild SLA if present + sla = None + if cap_node.get("sla"): + s = cap_node["sla"] + sla = SLA( + availability=s.get("availability"), + latency_p50_ms=s.get("latency_p50_ms"), + latency_p99_ms=s.get("latency_p99_ms"), + throughput_rps=s.get("throughput_rps"), + ) + + cap_data: dict[str, Any] = { "capability": cap_node["name"], "type": cap_node.get("capability_type", "rest"), + "topics": cap_node.get("topics"), } if x_security: cap_data["x-security"] = x_security + if contract: + cap_data["contract"] = contract + if sla: + cap_data["sla"] = sla provides.append(Capability.model_validate(cap_data)) # Build dependencies depends = [] for edge in depends_by_system.get(urn, []): + # Rebuild retry config if present + retry = None + if edge.get("retry"): + r = edge["retry"] + retry = RetryConfig( + max_attempts=r.get("max_attempts"), + backoff=r.get("backoff"), + ) + + # Rebuild circuit breaker if present + circuit_breaker = None + if edge.get("circuit_breaker"): + cb = edge["circuit_breaker"] + circuit_breaker = CircuitBreakerConfig( + failure_threshold=cb.get("failure_threshold"), + reset_timeout_ms=cb.get("reset_timeout_ms"), + ) + depends.append( Dependency( system=edge["to"], capability=edge.get("capability"), - type="rest", # Default, as type isn't stored in edge + type=edge.get("dependency_type", "rest"), criticality=edge.get("criticality", "required"), failure_mode=edge.get("failure_mode"), + timeout_ms=edge.get("timeout_ms"), + retry=retry, + circuit_breaker=circuit_breaker, + topics=edge.get("topics"), + access=edge.get("access"), ) ) manifest = SCPManifest( - scp="0.1.0", + scp=node.get("scp_version", "0.1.0"), system=System( urn=urn, name=node["name"], + description=node.get("description"), + version=node.get("version"), classification=classification, ), ownership=ownership, provides=provides if provides else None, depends=depends if depends else None, + constraints=constraints, + runtime=runtime, + failure_modes=failure_modes if failure_modes else None, ) manifests.append(manifest) diff --git a/tests/test_export.py b/tests/test_export.py index cd56520..aedaca1 100644 --- a/tests/test_export.py +++ b/tests/test_export.py @@ -11,6 +11,23 @@ export_graph_json, import_graph_json, ) +from scp_sdk.core.models import ( + Contract, + SLA, + RetryConfig, + CircuitBreakerConfig, + Constraints, + SecurityConstraints, + ComplianceConstraints, + OperationalConstraints, + Runtime, + Environment, + KubernetesRuntime, + AWSRuntime, + FailureMode, + FailureModeThresholds, + SecurityExtension, +) def test_export_simple_manifest(): @@ -129,8 +146,6 @@ def test_export_with_capabilities(): def test_roundtrip_with_security_extension(): """Test that x-security extension survives export/import roundtrip.""" - from scp_sdk import SecurityExtension - manifest = SCPManifest( scp="0.1.0", system=System(urn="urn:scp:test:security-tool", name="Security Tool"), @@ -318,3 +333,452 @@ def test_roundtrip_preservation(): assert manifest.system.name == original.system.name assert manifest.system.classification.tier == original.system.classification.tier assert manifest.ownership.team == original.ownership.team + + +# ============================================================================= +# New tests for complete field coverage +# ============================================================================= + + +def test_export_with_system_description_and_version(): + """Test exporting manifest with system description, version and tags.""" + manifest = SCPManifest( + scp="0.1.0", + system=System( + urn="urn:scp:test:service-a", + name="Service A", + description="A test service for payments", + version="2.3.1", + classification=Classification( + tier=1, domain="payments", tags=["critical", "pci"] + ), + ), + ) + + result = export_graph_json([manifest]) + node = result["nodes"][0] + + assert node["description"] == "A test service for payments" + assert node["version"] == "2.3.1" + assert node["scp_version"] == "0.1.0" + assert node["tags"] == ["critical", "pci"] + + +def test_export_with_capability_contract_and_sla(): + """Test exporting manifest with capability contract and SLA.""" + manifest = SCPManifest( + scp="0.1.0", + system=System(urn="urn:scp:test:service-a", name="Service A"), + provides=[ + Capability( + capability="payment-api", + type="rest", + contract=Contract(type="openapi", ref="./openapi.yaml"), + sla=SLA( + availability="99.95%", + latency_p50_ms=50, + latency_p99_ms=200, + throughput_rps=1000, + ), + topics=["payments.completed", "payments.failed"], + ) + ], + ) + + result = export_graph_json([manifest]) + cap = next(n for n in result["nodes"] if n["type"] == "Capability") + + assert cap["contract"] == {"type": "openapi", "ref": "./openapi.yaml"} + assert cap["sla"]["availability"] == "99.95%" + assert cap["sla"]["latency_p50_ms"] == 50 + assert cap["sla"]["latency_p99_ms"] == 200 + assert cap["sla"]["throughput_rps"] == 1000 + assert cap["topics"] == ["payments.completed", "payments.failed"] + + +def test_export_with_dependency_resilience_config(): + """Test exporting manifest with full dependency resilience configuration.""" + manifest = SCPManifest( + scp="0.1.0", + system=System(urn="urn:scp:test:service-a", name="Service A"), + depends=[ + Dependency( + system="urn:scp:test:database", + capability="user-data", + type="data", + criticality="required", + failure_mode="circuit-break", + timeout_ms=5000, + retry=RetryConfig(max_attempts=3, backoff="exponential"), + circuit_breaker=CircuitBreakerConfig( + failure_threshold=5, reset_timeout_ms=30000 + ), + topics=["users.updates"], + access="read-write", + ) + ], + ) + + result = export_graph_json([manifest]) + edge = result["edges"][0] + + assert edge["dependency_type"] == "data" + assert edge["timeout_ms"] == 5000 + assert edge["retry"] == {"max_attempts": 3, "backoff": "exponential"} + assert edge["circuit_breaker"] == { + "failure_threshold": 5, + "reset_timeout_ms": 30000, + } + assert edge["topics"] == ["users.updates"] + assert edge["access"] == "read-write" + + +def test_export_with_constraints(): + """Test exporting manifest with full constraints section.""" + manifest = SCPManifest( + scp="0.1.0", + system=System(urn="urn:scp:test:service-a", name="Service A"), + constraints=Constraints( + security=SecurityConstraints( + authentication=["oauth2", "mTLS"], + data_classification="confidential", + encryption={"at_rest": True, "in_transit": True}, + ), + compliance=ComplianceConstraints( + frameworks=["SOC2", "HIPAA"], + data_residency=["US", "EU"], + retention_days=365, + ), + operational=OperationalConstraints( + max_replicas=10, + min_replicas=2, + deployment_windows=["mon-fri 02:00-04:00 UTC"], + ), + ), + ) + + result = export_graph_json([manifest]) + node = result["nodes"][0] + + assert node["constraints"]["security"]["authentication"] == ["oauth2", "mTLS"] + assert node["constraints"]["security"]["data_classification"] == "confidential" + assert node["constraints"]["security"]["encryption"] == { + "at_rest": True, + "in_transit": True, + } + assert node["constraints"]["compliance"]["frameworks"] == ["SOC2", "HIPAA"] + assert node["constraints"]["compliance"]["data_residency"] == ["US", "EU"] + assert node["constraints"]["compliance"]["retention_days"] == 365 + assert node["constraints"]["operational"]["max_replicas"] == 10 + assert node["constraints"]["operational"]["min_replicas"] == 2 + + +def test_export_with_runtime(): + """Test exporting manifest with runtime environments.""" + manifest = SCPManifest( + scp="0.1.0", + system=System(urn="urn:scp:test:service-a", name="Service A"), + runtime=Runtime( + environments={ + "production": Environment( + otel_service_name="service-a-prod", + endpoints=["https://api.example.com"], + kubernetes=KubernetesRuntime( + namespace="production", + deployment="service-a", + service="service-a-svc", + ), + aws=AWSRuntime( + account_id="123456789012", + region="us-west-2", + arn="arn:aws:ecs:us-west-2:123456789012:service/service-a", + ), + ), + "staging": Environment( + otel_service_name="service-a-staging", + endpoints=["https://staging.example.com"], + ), + } + ), + ) + + result = export_graph_json([manifest]) + node = result["nodes"][0] + + prod = node["runtime"]["environments"]["production"] + assert prod["otel_service_name"] == "service-a-prod" + assert prod["endpoints"] == ["https://api.example.com"] + assert prod["kubernetes"]["namespace"] == "production" + assert prod["kubernetes"]["deployment"] == "service-a" + assert prod["aws"]["account_id"] == "123456789012" + assert prod["aws"]["region"] == "us-west-2" + + staging = node["runtime"]["environments"]["staging"] + assert staging["otel_service_name"] == "service-a-staging" + + +def test_export_with_failure_modes(): + """Test exporting manifest with failure modes.""" + manifest = SCPManifest( + scp="0.1.0", + system=System(urn="urn:scp:test:service-a", name="Service A"), + failure_modes=[ + FailureMode( + mode="database-latency-spike", + impact="degraded-experience", + detection="P99 latency > 500ms for 5 minutes", + recovery="Scale up database read replicas", + degraded_behavior="Slower response times, cached data served", + mttr_target_minutes=15, + thresholds=FailureModeThresholds(warning_ms=200, critical_ms=500), + ), + FailureMode( + mode="cache-miss-storm", + impact="partial-outage", + detection="Cache hit ratio < 50%", + recovery="Warm cache, increase TTL", + ), + ], + ) + + result = export_graph_json([manifest]) + node = result["nodes"][0] + + assert len(node["failure_modes"]) == 2 + + fm1 = node["failure_modes"][0] + assert fm1["mode"] == "database-latency-spike" + assert fm1["impact"] == "degraded-experience" + assert fm1["detection"] == "P99 latency > 500ms for 5 minutes" + assert fm1["recovery"] == "Scale up database read replicas" + assert fm1["degraded_behavior"] == "Slower response times, cached data served" + assert fm1["mttr_target_minutes"] == 15 + assert fm1["thresholds"]["warning_ms"] == 200 + assert fm1["thresholds"]["critical_ms"] == 500 + + fm2 = node["failure_modes"][1] + assert fm2["mode"] == "cache-miss-storm" + assert fm2["impact"] == "partial-outage" + + +def test_complete_roundtrip_preservation(): + """Test that a fully-populated manifest survives export->import roundtrip.""" + original = SCPManifest( + scp="0.2.0", + system=System( + urn="urn:scp:test:complete-service", + name="Complete Service", + description="A fully-featured service for testing", + version="3.0.0", + classification=Classification( + tier=1, domain="core", tags=["mission-critical"] + ), + ), + ownership=Ownership( + team="platform-team", + contacts=[ + Contact(type="slack", ref="#platform-oncall"), + Contact(type="pagerduty", ref="PABCD12"), + ], + escalation=["tech-lead", "engineering-manager", "vp-engineering"], + ), + provides=[ + Capability( + capability="main-api", + type="rest", + contract=Contract(type="openapi", ref="./api.yaml"), + sla=SLA(availability="99.99%", latency_p99_ms=100), + topics=["events.published"], + ), + ], + depends=[ + Dependency( + system="urn:scp:test:auth-service", + capability="verify-token", + type="grpc", + criticality="required", + failure_mode="fail-fast", + timeout_ms=100, + retry=RetryConfig(max_attempts=2, backoff="exponential"), + ), + ], + constraints=Constraints( + security=SecurityConstraints( + authentication=["mTLS"], + data_classification="restricted", + ), + compliance=ComplianceConstraints( + frameworks=["PCI-DSS"], + retention_days=2555, + ), + operational=OperationalConstraints( + min_replicas=3, + max_replicas=20, + ), + ), + runtime=Runtime( + environments={ + "production": Environment( + otel_service_name="complete-service-prod", + endpoints=["https://api.prod.example.com"], + kubernetes=KubernetesRuntime( + namespace="prod", deployment="complete-svc" + ), + ), + } + ), + failure_modes=[ + FailureMode( + mode="upstream-timeout", + impact="degraded-experience", + detection="Error rate > 1%", + recovery="Failover to backup", + thresholds=FailureModeThresholds(warning_ms=50, critical_ms=100), + ), + ], + ) + + # Export then import + exported = export_graph_json([original]) + imported = import_graph_json(exported) + + # Verify complete preservation + assert len(imported) == 1 + m = imported[0] + + # System + assert m.scp == "0.2.0" + assert m.system.urn == original.system.urn + assert m.system.name == original.system.name + assert m.system.description == original.system.description + assert m.system.version == original.system.version + assert m.system.classification.tier == 1 + assert m.system.classification.domain == "core" + assert m.system.classification.tags == ["mission-critical"] + + # Ownership + assert m.ownership.team == "platform-team" + assert len(m.ownership.contacts) == 2 + assert m.ownership.escalation == [ + "tech-lead", + "engineering-manager", + "vp-engineering", + ] + + # Capabilities + assert len(m.provides) == 1 + cap = m.provides[0] + assert cap.capability == "main-api" + assert cap.type == "rest" + assert cap.contract.type == "openapi" + assert cap.contract.ref == "./api.yaml" + assert cap.sla.availability == "99.99%" + assert cap.sla.latency_p99_ms == 100 + assert cap.topics == ["events.published"] + + # Dependencies + assert len(m.depends) == 1 + dep = m.depends[0] + assert dep.system == "urn:scp:test:auth-service" + assert dep.capability == "verify-token" + assert dep.type == "grpc" + assert dep.criticality == "required" + assert dep.failure_mode == "fail-fast" + assert dep.timeout_ms == 100 + assert dep.retry.max_attempts == 2 + assert dep.retry.backoff == "exponential" + + # Constraints + assert m.constraints.security.authentication == ["mTLS"] + assert m.constraints.security.data_classification == "restricted" + assert m.constraints.compliance.frameworks == ["PCI-DSS"] + assert m.constraints.compliance.retention_days == 2555 + assert m.constraints.operational.min_replicas == 3 + assert m.constraints.operational.max_replicas == 20 + + # Runtime + assert "production" in m.runtime.environments + prod = m.runtime.environments["production"] + assert prod.otel_service_name == "complete-service-prod" + assert prod.endpoints == ["https://api.prod.example.com"] + assert prod.kubernetes.namespace == "prod" + assert prod.kubernetes.deployment == "complete-svc" + + # Failure modes + assert len(m.failure_modes) == 1 + fm = m.failure_modes[0] + assert fm.mode == "upstream-timeout" + assert fm.impact == "degraded-experience" + assert fm.thresholds.warning_ms == 50 + assert fm.thresholds.critical_ms == 100 + + +def test_roundtrip_with_capability_contract_and_sla(): + """Test that capability contract and SLA survive roundtrip.""" + original = SCPManifest( + scp="0.1.0", + system=System(urn="urn:scp:test:api-service", name="API Service"), + provides=[ + Capability( + capability="data-api", + type="graphql", + contract=Contract(type="graphql", ref="./schema.graphql"), + sla=SLA( + availability="99.9%", + latency_p50_ms=25, + latency_p99_ms=150, + throughput_rps=5000, + ), + ) + ], + ) + + exported = export_graph_json([original]) + imported = import_graph_json(exported) + + cap = imported[0].provides[0] + assert cap.contract.type == "graphql" + assert cap.contract.ref == "./schema.graphql" + assert cap.sla.availability == "99.9%" + assert cap.sla.latency_p50_ms == 25 + assert cap.sla.latency_p99_ms == 150 + assert cap.sla.throughput_rps == 5000 + + +def test_roundtrip_with_dependency_resilience(): + """Test that dependency resilience config survives roundtrip.""" + original = SCPManifest( + scp="0.1.0", + system=System(urn="urn:scp:test:consumer", name="Consumer"), + depends=[ + Dependency( + system="urn:scp:test:provider", + capability="events", + type="event", + criticality="degraded", + failure_mode="queue-buffer", + timeout_ms=10000, + retry=RetryConfig(max_attempts=5, backoff="linear"), + circuit_breaker=CircuitBreakerConfig( + failure_threshold=10, reset_timeout_ms=60000 + ), + topics=["orders.created", "orders.updated"], + access="read", + ) + ], + ) + + exported = export_graph_json([original]) + imported = import_graph_json(exported) + + dep = imported[0].depends[0] + assert dep.type == "event" + assert dep.criticality == "degraded" + assert dep.failure_mode == "queue-buffer" + assert dep.timeout_ms == 10000 + assert dep.retry.max_attempts == 5 + assert dep.retry.backoff == "linear" + assert dep.circuit_breaker.failure_threshold == 10 + assert dep.circuit_breaker.reset_timeout_ms == 60000 + assert dep.topics == ["orders.created", "orders.updated"] + assert dep.access == "read"