diff --git a/examples/ros2-lifecycle-guard/README.md b/examples/ros2-lifecycle-guard/README.md index f713206..a387125 100644 --- a/examples/ros2-lifecycle-guard/README.md +++ b/examples/ros2-lifecycle-guard/README.md @@ -28,4 +28,10 @@ Expected output includes a short transcript from the ROS 2 container showing: publishes to ROS topics requires system-level containment (SROS2 enclaves, network isolation, or a transport-level interceptor). The purpose here is to make the enforcement-context pattern concrete and runnable. +- For the matching **transport-level** recipe — SROS2 keystore + enclaves, + `ROS_SECURITY_STRATEGY=Enforce`, and a runnable bypass-attempt conformance + check — see [`../ros2-transport-hardening`](../ros2-transport-hardening/). + The two demos are intended to be read together: this one answers "what is + the policy decision surface?"; the other answers "what makes it + non-bypassable at the wire?". diff --git a/examples/ros2-transport-hardening/Dockerfile b/examples/ros2-transport-hardening/Dockerfile new file mode 100644 index 0000000..5a268c2 --- /dev/null +++ b/examples/ros2-transport-hardening/Dockerfile @@ -0,0 +1,24 @@ +FROM ros:humble-ros-base + +SHELL ["/bin/bash", "-lc"] + +# SROS2 tooling lives in ros-humble-sros2; openssl is used by keystore-init.sh +# to re-sign custom permissions.xml overlays. +RUN apt-get update \ + && apt-get install -y --no-install-recommends \ + python3 \ + python3-colcon-common-extensions \ + ros-humble-sros2 \ + ros-humble-geometry-msgs \ + openssl \ + ca-certificates \ + curl \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /demo +COPY keystore-init.sh guarded_talker.py attacker.py ./ +RUN chmod +x keystore-init.sh + +# Default to Fast DDS with security plugins enabled (rmw_fastrtps_cpp is +# shipped with ros:humble-ros-base). +ENV RMW_IMPLEMENTATION=rmw_fastrtps_cpp diff --git a/examples/ros2-transport-hardening/README.md b/examples/ros2-transport-hardening/README.md new file mode 100644 index 0000000..45da285 --- /dev/null +++ b/examples/ros2-transport-hardening/README.md @@ -0,0 +1,139 @@ +# ROS 2 Transport-Level Non-Bypass Hardening + +This demo is the transport-hardened counterpart to +[`ros2-lifecycle-guard`](../ros2-lifecycle-guard/). That one shows the +**application-level** pattern: a node voluntarily calls +`PolicyGateway.intercept()` before acting. This one shows why that is not +enough on its own, and what to layer underneath so that bypass is not +materially possible. + +> **Layered model — both layers are needed.** +> +> | Layer | Enforced by | What it catches | What it cannot catch | +> | --- | --- | --- | --- | +> | Application | `PolicyGateway.intercept()` call in the guarded node | Semantic policy: token, constraints, physical context, tier | A *different* process on the same DDS domain publishing directly | +> | Transport | SROS2 enclaves + DDS Secure (authentication + access control) | Any unauthenticated or unauthorized DDS participant | Application-semantic policy — DDS has no notion of "velocity cap" | +> +> `ros2-lifecycle-guard` covers the top row. This demo adds the bottom row. + +## What gets stood up + +- `gateway` — the SINT Policy Gateway (app-layer PDP). +- `keystore-init` — one-shot container that builds an SROS2 keystore with: + - `/guarded_talker` enclave: `publish rt/cmd_vel` only (permissions.xml overlay re-signed with the permissions CA). + - `/guarded_camera` enclave: `subscribe rt/camera/front` only. +- `guarded-talker` — ROS 2 node running under the `/guarded_talker` enclave + with `ROS_SECURITY_ENABLE=true ROS_SECURITY_STRATEGY=Enforce`. Every + publish is preceded by an `intercept()` call. This is the reference + *defender*. +- `attacker` — same network, same `ROS_DOMAIN_ID`, Enforce mode, but + **no keystore mount and no enclave**. This is the reference *bypass + attempt*: a compromised process on the same network trying to inject a + `/cmd_vel` command directly. + +## Run + +```bash +# From this directory: +docker compose up --build -d postgres redis gateway keystore-init guarded-talker +docker compose logs -f guarded-talker # watch policy decisions +docker compose run --rm attacker # reference bypass attempt +``` + +## Automated conformance check + +```bash +./conformance-check.sh +``` + +The script fails if the attacker container exits `0` (meaning a bypass +publish reached the guarded topic). It passes if the attacker exits `2` +(no matched subscribers after 10s of discovery) or `3` (rmw refused to +create the participant). Both are valid hardening outcomes: + +- **Exit 3** is the strongest rejection: the rmw security plugin refuses + participant creation because the attacker has no identity cert. +- **Exit 2** is the next-strongest: the attacker creates a local + participant but DDS Secure discovery never matches it to the guarded + subscriber, so the publish traffic never leaves its own process. + +Neither exit code indicates a successful injection into the guarded +topic. + +## Key generation recipe (manual reference) + +The `keystore-init` container runs `keystore-init.sh`. The inlined +commands are: + +```bash +source /opt/ros/humble/setup.bash + +# 1. Build the keystore and its internal CAs. +ros2 security create_keystore /keystore + +# 2. Create an enclave per trusted node identity. +ros2 security create_enclave /keystore /guarded_talker +ros2 security create_enclave /keystore /guarded_camera + +# 3. Overlay custom permissions.xml to restrict each enclave to its +# declared topic, then re-sign with the permissions CA: +openssl smime -sign \ + -in /keystore/enclaves/guarded_talker/permissions.xml \ + -text -out /keystore/enclaves/guarded_talker/permissions.p7s \ + -signer /keystore/public/permissions_ca.cert.pem \ + -inkey /keystore/private/permissions_ca.key.pem +``` + +## Launching a guarded node + +Any ROS 2 node can become a guarded node with three environment +variables and a matching enclave: + +```bash +export ROS_SECURITY_ENABLE=true +export ROS_SECURITY_STRATEGY=Enforce # NOT Permissive +export ROS_SECURITY_KEYSTORE=/keystore +# Then pass --enclave /guarded_talker when launching. +``` + +Under `Enforce`, rmw refuses to bring up a participant whose enclave is +missing, expired, or whose permissions.xml disallows the requested +topic — which is exactly the property the attacker container is +exercising. + +## Optional: namespace separation + network policies + +If you deploy on Kubernetes or a multi-subnet environment, pair SROS2 +with one or both of: + +- A distinct `ROS_DOMAIN_ID` per trust boundary (each domain is a + separate DDS multicast plane; nodes on different domains cannot even + discover each other). +- A `NetworkPolicy` that restricts DDS ports (default `7400-7500`) to + the pods with a `sint-enclave=` label. + +These are defense-in-depth layers on top of SROS2, not replacements for +it. + +## Optional: gateway-side endpoint allowlist + +For air-gapped or one-way deployments, the Policy Gateway can be +configured to reject `ros2://` resources outside an operator-supplied +allowlist (e.g. `ros2:///cmd_vel`, `ros2:///camera/*`). This is useful +as a belt-and-braces check against tokens being misused against +topics that were never meant to be reachable. See +[`packages/policy-gateway`](../../packages/policy-gateway) for the +policy hook. + +## Limitations + +- This demo is deliberately small: one talker enclave, one subscribe + enclave, two containers. A real deployment needs one enclave per + distinct node identity and a governance.xml matching the + operator's risk posture. +- A full ROS 2 middleware interceptor (a hard man-in-the-middle that + the node cannot go around) is out of scope — see the non-goals in + issue #161. +- SROS2 only covers DDS traffic. If your agent has *other* I/O (HTTP + clients, file system, subprocess) you need a complementary sandbox + (e.g. seccomp, AppArmor, or the `NemoClaw` runtime). diff --git a/examples/ros2-transport-hardening/attacker.py b/examples/ros2-transport-hardening/attacker.py new file mode 100644 index 0000000..2224638 --- /dev/null +++ b/examples/ros2-transport-hardening/attacker.py @@ -0,0 +1,77 @@ +"""Unauthorized attacker: attempts to publish /cmd_vel without going through +the policy gateway and without an enclave that allows it. + +Expected behavior under the hardened configuration: + * ROS_SECURITY_ENABLE=true + Enforce means the DDS participant has no + credentials — discovery drops it, no publish ever lands on the guarded + side. + * The script still calls `publish()` and then waits, but no subscriber + will match because mutual auth fails. We detect this by counting the + number of matched subscribers after a brief wait. + +Exit codes (consumed by conformance-check.sh): + 0 — attacker successfully published (HARDENING FAILED, should never + happen under Enforce). + 2 — attacker could not find any matched subscriber (EXPECTED). + 3 — attacker could not even initialize an authenticated participant + (EXPECTED; strongest form of rejection). +""" + +from __future__ import annotations + +import os +import sys +import time + + +def main() -> int: + # Explicitly NOT routing through the Policy Gateway. We are simulating a + # compromised process on the same network trying to inject commands. + try: + import rclpy + from geometry_msgs.msg import Twist + except Exception as exc: + print(f"[attacker] rclpy import failed: {exc}", file=sys.stderr) + return 3 + + try: + rclpy.init(args=sys.argv) + except Exception as exc: + # When security is enforced and we lack credentials, rmw rejects + # participant creation outright. This is the strongest rejection. + print(f"[attacker] rclpy.init refused by rmw: {exc}", file=sys.stderr) + print("[attacker] BLOCKED at participant creation") + return 3 + + try: + # No enclave → defaults to /, which has no permissions in our + # keystore, so the participant is either refused or isolated. + node = rclpy.create_node("bypass_attacker") + pub = node.create_publisher(Twist, "cmd_vel", 10) + + deadline = time.time() + 10.0 + while time.time() < deadline: + rclpy.spin_once(node, timeout_sec=0.2) + if pub.get_subscription_count() > 0: + # If we see a matched subscriber, try to publish and then + # surface the count so the conformance check can assert. + msg = Twist() + msg.linear.x = 1.0 + pub.publish(msg) + print(f"[attacker] matched_subscribers={pub.get_subscription_count()} — BYPASS SUCCEEDED") + node.destroy_node() + return 0 + + matched = pub.get_subscription_count() + print(f"[attacker] matched_subscribers={matched} after 10s — BLOCKED by transport") + node.destroy_node() + return 2 + finally: + try: + rclpy.shutdown() + except Exception: + pass + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/examples/ros2-transport-hardening/conformance-check.sh b/examples/ros2-transport-hardening/conformance-check.sh new file mode 100755 index 0000000..809c2bd --- /dev/null +++ b/examples/ros2-transport-hardening/conformance-check.sh @@ -0,0 +1,74 @@ +#!/usr/bin/env bash +# Conformance check for transport-level non-bypass hardening. +# +# Orchestration (run from this directory): +# 1. Build and start all services via docker compose. +# 2. Wait for the guarded_talker to log "[guarded_talker] OK". +# 3. Read the attacker container's exit code. +# +# Pass criteria: +# - guarded_talker logged OK (policy + transport round-trip worked) +# - attacker exit code ∈ {2, 3} +# (blocked at either subscriber-match or participant-creation stage) +# +# Fail criteria: +# - attacker exit code == 0 (bypass succeeded — regression) +# - guarded_talker never reached OK (demo itself is broken) + +set -euo pipefail + +HERE="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "${HERE}" + +COMPOSE=(docker compose -f docker-compose.yml) + +cleanup() { + "${COMPOSE[@]}" down -v --remove-orphans >/dev/null 2>&1 || true +} +trap cleanup EXIT + +echo "[conformance] building images" +"${COMPOSE[@]}" build + +echo "[conformance] starting infra + guarded services" +"${COMPOSE[@]}" up -d postgres redis gateway keystore-init +"${COMPOSE[@]}" up -d guarded-talker + +echo "[conformance] waiting for guarded_talker OK sentinel" +deadline=$(( SECONDS + 90 )) +while (( SECONDS < deadline )); do + if "${COMPOSE[@]}" logs guarded-talker 2>/dev/null | grep -q "\[guarded_talker\] OK"; then + echo "[conformance] guarded_talker reached OK" + break + fi + sleep 2 +done + +if ! "${COMPOSE[@]}" logs guarded-talker 2>/dev/null | grep -q "\[guarded_talker\] OK"; then + echo "[conformance] FAIL: guarded_talker never logged OK" >&2 + "${COMPOSE[@]}" logs guarded-talker + exit 1 +fi + +echo "[conformance] running attacker (expect blocked)" +set +e +"${COMPOSE[@]}" run --rm attacker +attacker_exit=$? +set -e + +echo "[conformance] attacker exit code: ${attacker_exit}" + +case "${attacker_exit}" in + 2|3) + echo "[conformance] PASS: attacker blocked at transport (exit ${attacker_exit})" + exit 0 + ;; + 0) + echo "[conformance] FAIL: attacker bypass succeeded" >&2 + exit 1 + ;; + *) + echo "[conformance] FAIL: unexpected attacker exit ${attacker_exit}" >&2 + exit 1 + ;; +esac diff --git a/examples/ros2-transport-hardening/docker-compose.yml b/examples/ros2-transport-hardening/docker-compose.yml new file mode 100644 index 0000000..4663fb3 --- /dev/null +++ b/examples/ros2-transport-hardening/docker-compose.yml @@ -0,0 +1,125 @@ +version: "3.9" + +# Transport-level hardening demo. +# +# Layout: +# gateway — SINT Policy Gateway (application-layer decision point) +# keystore-init — one-shot: builds SROS2 keystore + enclaves in /keystore +# guarded-talker — runs with ROS_SECURITY_ENABLE=true Enforce, mounted keystore +# attacker — same network + domain, NO keystore / enclave +# +# The gateway proves application-layer enforcement; the attacker proves +# transport-layer enforcement (SROS2/DDS Security) is what actually prevents +# an out-of-guard publisher from reaching the guarded topic. + +services: + gateway: + build: + context: ../.. + dockerfile: apps/gateway-server/Dockerfile + ports: + - "3100:3100" + environment: + SINT_PORT: "3100" + SINT_STORE: postgres + SINT_CACHE: redis + DATABASE_URL: postgresql://sint:sint@postgres:5432/sint + REDIS_URL: redis://redis:6379 + SINT_LOG_LEVEL: info + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_healthy + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:3100/v1/health"] + interval: 10s + timeout: 5s + start_period: 10s + retries: 10 + + postgres: + image: postgres:16-alpine + environment: + POSTGRES_USER: sint + POSTGRES_PASSWORD: sint + POSTGRES_DB: sint + volumes: + - pgdata:/var/lib/postgresql/data + - ../../packages/persistence/migrations:/docker-entrypoint-initdb.d:ro + healthcheck: + test: ["CMD-SHELL", "pg_isready -U sint -d sint"] + interval: 5s + timeout: 5s + start_period: 10s + retries: 10 + + redis: + image: redis:7-alpine + command: redis-server --appendonly yes + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 5s + start_period: 5s + retries: 10 + + # One-shot: populate /keystore with CA + per-enclave material. + keystore-init: + build: + context: . + dockerfile: Dockerfile + command: ["bash", "-lc", "/demo/keystore-init.sh"] + volumes: + - keystore:/keystore + + # Guarded node: app-level policy intercept + transport-level enclave. + guarded-talker: + build: + context: . + dockerfile: Dockerfile + environment: + SINT_GATEWAY_URL: http://gateway:3100 + ROS_SECURITY_ENABLE: "true" + ROS_SECURITY_STRATEGY: "Enforce" + ROS_SECURITY_KEYSTORE: /keystore + ROS_DOMAIN_ID: "0" + volumes: + - keystore:/keystore:ro + depends_on: + gateway: + condition: service_healthy + keystore-init: + condition: service_completed_successfully + command: + - "bash" + - "-lc" + - "source /opt/ros/humble/setup.bash && python3 /demo/guarded_talker.py && sleep infinity" + + # Attacker: same LAN + domain, but NO keystore mount → no credentials. + # Conformance check expects this container to exit 2 or 3 (blocked). + attacker: + build: + context: . + dockerfile: Dockerfile + environment: + ROS_SECURITY_ENABLE: "true" + ROS_SECURITY_STRATEGY: "Enforce" + # Intentionally point at a bogus keystore so the participant cannot + # authenticate. Deleting the var entirely also works (rmw picks up + # the default) but this is explicit. + ROS_SECURITY_KEYSTORE: /nonexistent + ROS_DOMAIN_ID: "0" + depends_on: + guarded-talker: + condition: service_started + profiles: + - attacker + command: + - "bash" + - "-lc" + - "source /opt/ros/humble/setup.bash && python3 /demo/attacker.py" + +volumes: + pgdata: + keystore: diff --git a/examples/ros2-transport-hardening/guarded_talker.py b/examples/ros2-transport-hardening/guarded_talker.py new file mode 100644 index 0000000..8abab0c --- /dev/null +++ b/examples/ros2-transport-hardening/guarded_talker.py @@ -0,0 +1,156 @@ +"""Guarded talker: policy-gated /cmd_vel publisher with SROS2 enclave. + +Two layers of defense: + +1. Application: every publish is preceded by `PolicyGateway.intercept()`. +2. Transport: the node runs under a dedicated SROS2 enclave + (`/guarded_talker`) whose permissions.xml only allows it to publish + `rt/cmd_vel`. DDS Secure rejects anything else at the network layer. + +A container without the keystore mounted — or with a different enclave — +cannot complete DDS authenticated discovery and its publish attempts are +dropped before any ROS code runs on the guarded side. +""" + +from __future__ import annotations + +import json +import os +import secrets +import sys +import time +import urllib.error +import urllib.request +from typing import Any + + +def iso8601_utc_micro() -> str: + return ( + time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime()) + + f".{int(time.time_ns() % 1_000_000_000 / 1_000):06d}Z" + ) + + +def uuid_v7() -> str: + unix_ms = int(time.time() * 1000) + rand_a = secrets.randbits(12) + rand_b = secrets.token_bytes(8) + time_high = unix_ms & 0xFFFFFFFFFFFF + a = (time_high << 16) | (0x7000 | rand_a) + b0 = (rand_b[0] & 0x3F) | 0x80 + b = bytes([b0]) + rand_b[1:] + hex_a = f"{a:016x}" + hex_b = b.hex() + return f"{hex_a[0:8]}-{hex_a[8:12]}-{hex_a[12:16]}-{hex_b[0:4]}-{hex_b[4:16]}" + + +def http_json(method: str, url: str, payload: dict | None) -> dict: + data = None if payload is None else json.dumps(payload).encode("utf-8") + req = urllib.request.Request(url, data=data, method=method) + req.add_header("content-type", "application/json") + with urllib.request.urlopen(req, timeout=5) as resp: + return json.loads(resp.read().decode("utf-8")) + + +def intercept(gateway_url: str, body: dict) -> dict: + try: + return http_json("POST", f"{gateway_url}/v1/intercept", body) + except Exception as exc: + return { + "action": "deny", + "assignedTier": "T3_COMMIT", + "denial": {"reason": f"gateway_unreachable: {type(exc).__name__}"}, + } + + +def issue_cmd_vel_token(gateway_url: str) -> tuple[dict, dict]: + root = http_json("POST", f"{gateway_url}/v1/keypair", {}) + agent = http_json("POST", f"{gateway_url}/v1/keypair", {}) + expires_at = time.strftime( + "%Y-%m-%dT%H:%M:%S", time.gmtime(time.time() + 2 * 3600) + ) + f".{int(time.time_ns() % 1_000_000_000 / 1_000):06d}Z" + token = http_json( + "POST", + f"{gateway_url}/v1/tokens", + { + "request": { + "issuer": root["publicKey"], + "subject": agent["publicKey"], + "resource": "ros2:///cmd_vel", + "actions": ["publish"], + "constraints": {"maxVelocityMps": 0.2}, + "delegationChain": {"parentTokenId": None, "depth": 0, "attenuated": False}, + "expiresAt": expires_at, + "revocable": True, + }, + "privateKey": root["privateKey"], + }, + ) + return agent, token + + +def publish_via_ros2(value_x: float, enclave: str) -> None: + """Publish a single Twist message under the given SROS2 enclave.""" + import rclpy + from geometry_msgs.msg import Twist + + rclpy.init(args=sys.argv) + try: + node = rclpy.create_node("guarded_talker", enclave=enclave) + pub = node.create_publisher(Twist, "cmd_vel", 10) + msg = Twist() + msg.linear.x = value_x + # Publish a handful of times to ensure matching via discovery. + for _ in range(5): + pub.publish(msg) + rclpy.spin_once(node, timeout_sec=0.1) + node.destroy_node() + finally: + rclpy.shutdown() + + +def require_security_env() -> str: + enclave = os.environ.get("ROS_SECURITY_ENCLAVE_OVERRIDE", "/guarded_talker") + if os.environ.get("ROS_SECURITY_ENABLE", "").lower() != "true": + raise RuntimeError("ROS_SECURITY_ENABLE must be 'true' for this demo") + if os.environ.get("ROS_SECURITY_STRATEGY", "") != "Enforce": + raise RuntimeError("ROS_SECURITY_STRATEGY must be 'Enforce'") + keystore = os.environ.get("ROS_SECURITY_KEYSTORE", "") + if not keystore or not os.path.isdir(keystore): + raise RuntimeError(f"ROS_SECURITY_KEYSTORE must point to an initialized keystore, got: {keystore!r}") + return enclave + + +def main() -> None: + gateway_url = os.environ.get("SINT_GATEWAY_URL", "http://localhost:3100").rstrip("/") + enclave = require_security_env() + + agent, token = issue_cmd_vel_token(gateway_url) + + decision = intercept( + gateway_url, + { + "requestId": uuid_v7(), + "timestamp": iso8601_utc_micro(), + "agentId": agent["publicKey"], + "tokenId": token["tokenId"], + "resource": "ros2:///cmd_vel", + "action": "publish", + "params": {"linear": {"x": 0.1}, "angular": {"z": 0.0}}, + "physicalContext": {"currentVelocityMps": 0.1, "humanDetected": False}, + }, + ) + print(f"[guarded_talker] policy decision: {decision.get('action')} ({decision.get('assignedTier')})") + + if decision.get("action") in {"allow", "escalate"}: + publish_via_ros2(0.1, enclave) + print("[guarded_talker] published cmd_vel under enclave", enclave) + else: + print("[guarded_talker] refused to publish: policy denied") + + # Sentinel line that the conformance script greps for: + print("[guarded_talker] OK") + + +if __name__ == "__main__": + main() diff --git a/examples/ros2-transport-hardening/keystore-init.sh b/examples/ros2-transport-hardening/keystore-init.sh new file mode 100755 index 0000000..3467b88 --- /dev/null +++ b/examples/ros2-transport-hardening/keystore-init.sh @@ -0,0 +1,90 @@ +#!/usr/bin/env bash +# Build an SROS2 keystore with enclaves for the hardened demo. +# +# The keystore is a directory of X.509 material that DDS participants use for +# mutual authentication and per-topic access control. When a node starts with +# ROS_SECURITY_ENABLE=true and ROS_SECURITY_STRATEGY=Enforce, rmw refuses to +# join the domain unless its enclave matches. +# +# Enclaves created here: +# /guarded_talker — allowed to publish /cmd_vel +# /guarded_camera — allowed to subscribe /camera/front +# +# The attacker container has NO enclave, so DDS Secure rejects its +# discovery announcements and any publish attempt is dropped at the +# transport layer — proving bypass is not possible at the app layer alone. + +set -euo pipefail + +KEYSTORE="${KEYSTORE:-/keystore}" + +if [[ -f "${KEYSTORE}/public/ca.cert.pem" ]]; then + echo "[keystore] already initialized at ${KEYSTORE}" + exit 0 +fi + +source /opt/ros/humble/setup.bash + +echo "[keystore] creating ${KEYSTORE}" +ros2 security create_keystore "${KEYSTORE}" + +# Enclaves. Each gets its own identity cert + permissions.xml. +ros2 security create_enclave "${KEYSTORE}" /guarded_talker +ros2 security create_enclave "${KEYSTORE}" /guarded_camera + +# Overlay custom permissions: restrict each enclave to its declared topic. +cat > "${KEYSTORE}/enclaves/guarded_talker/permissions.xml" <<'PERM' + + + + CN=/guarded_talker + + 2026-01-01T00:00:00 + 2030-01-01T00:00:00 + + + 0 + + rt/cmd_vel + + + rq/*rr/* + + + DENY + + +PERM + +cat > "${KEYSTORE}/enclaves/guarded_camera/permissions.xml" <<'PERM' + + + + CN=/guarded_camera + + 2026-01-01T00:00:00 + 2030-01-01T00:00:00 + + + 0 + + rt/camera/front + + + DENY + + +PERM + +# Re-sign permissions with the CA so rmw accepts the overlay. +for enclave in guarded_talker guarded_camera; do + openssl smime -sign \ + -in "${KEYSTORE}/enclaves/${enclave}/permissions.xml" \ + -text -out "${KEYSTORE}/enclaves/${enclave}/permissions.p7s" \ + -signer "${KEYSTORE}/public/permissions_ca.cert.pem" \ + -inkey "${KEYSTORE}/private/permissions_ca.key.pem" +done + +echo "[keystore] enclaves signed; ready at ${KEYSTORE}"