Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions examples/ros2-lifecycle-guard/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,10 @@ Expected output includes a short transcript from the ROS 2 container showing:
publishes to ROS topics requires system-level containment (SROS2 enclaves,
network isolation, or a transport-level interceptor). The purpose here is to
make the enforcement-context pattern concrete and runnable.
- For the matching **transport-level** recipe — SROS2 keystore + enclaves,
`ROS_SECURITY_STRATEGY=Enforce`, and a runnable bypass-attempt conformance
check — see [`../ros2-transport-hardening`](../ros2-transport-hardening/).
The two demos are intended to be read together: this one answers "what is
the policy decision surface?"; the other answers "what makes it
non-bypassable at the wire?".

24 changes: 24 additions & 0 deletions examples/ros2-transport-hardening/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
FROM ros:humble-ros-base

SHELL ["/bin/bash", "-lc"]

# SROS2 tooling lives in ros-humble-sros2; openssl is used by keystore-init.sh
# to re-sign custom permissions.xml overlays.
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
python3 \
python3-colcon-common-extensions \
ros-humble-sros2 \
ros-humble-geometry-msgs \
openssl \
ca-certificates \
curl \
&& rm -rf /var/lib/apt/lists/*

WORKDIR /demo
COPY keystore-init.sh guarded_talker.py attacker.py ./
RUN chmod +x keystore-init.sh

# Default to Fast DDS with security plugins enabled (rmw_fastrtps_cpp is
# shipped with ros:humble-ros-base).
ENV RMW_IMPLEMENTATION=rmw_fastrtps_cpp
139 changes: 139 additions & 0 deletions examples/ros2-transport-hardening/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# ROS 2 Transport-Level Non-Bypass Hardening

This demo is the transport-hardened counterpart to
[`ros2-lifecycle-guard`](../ros2-lifecycle-guard/). That one shows the
**application-level** pattern: a node voluntarily calls
`PolicyGateway.intercept()` before acting. This one shows why that is not
enough on its own, and what to layer underneath so that bypass is not
materially possible.

> **Layered model — both layers are needed.**
>
> | Layer | Enforced by | What it catches | What it cannot catch |
> | --- | --- | --- | --- |
> | Application | `PolicyGateway.intercept()` call in the guarded node | Semantic policy: token, constraints, physical context, tier | A *different* process on the same DDS domain publishing directly |
> | Transport | SROS2 enclaves + DDS Secure (authentication + access control) | Any unauthenticated or unauthorized DDS participant | Application-semantic policy — DDS has no notion of "velocity cap" |
>
> `ros2-lifecycle-guard` covers the top row. This demo adds the bottom row.

## What gets stood up

- `gateway` — the SINT Policy Gateway (app-layer PDP).
- `keystore-init` — one-shot container that builds an SROS2 keystore with:
- `/guarded_talker` enclave: `publish rt/cmd_vel` only (permissions.xml overlay re-signed with the permissions CA).
- `/guarded_camera` enclave: `subscribe rt/camera/front` only.
- `guarded-talker` — ROS 2 node running under the `/guarded_talker` enclave
with `ROS_SECURITY_ENABLE=true ROS_SECURITY_STRATEGY=Enforce`. Every
publish is preceded by an `intercept()` call. This is the reference
*defender*.
- `attacker` — same network, same `ROS_DOMAIN_ID`, Enforce mode, but
**no keystore mount and no enclave**. This is the reference *bypass
attempt*: a compromised process on the same network trying to inject a
`/cmd_vel` command directly.

## Run

```bash
# From this directory:
docker compose up --build -d postgres redis gateway keystore-init guarded-talker
docker compose logs -f guarded-talker # watch policy decisions
docker compose run --rm attacker # reference bypass attempt
```

## Automated conformance check

```bash
./conformance-check.sh
```

The script fails if the attacker container exits `0` (meaning a bypass
publish reached the guarded topic). It passes if the attacker exits `2`
(no matched subscribers after 10s of discovery) or `3` (rmw refused to
create the participant). Both are valid hardening outcomes:

- **Exit 3** is the strongest rejection: the rmw security plugin refuses
participant creation because the attacker has no identity cert.
- **Exit 2** is the next-strongest: the attacker creates a local
participant but DDS Secure discovery never matches it to the guarded
subscriber, so the publish traffic never leaves its own process.

Neither exit code indicates a successful injection into the guarded
topic.

## Key generation recipe (manual reference)

The `keystore-init` container runs `keystore-init.sh`. The inlined
commands are:

```bash
source /opt/ros/humble/setup.bash

# 1. Build the keystore and its internal CAs.
ros2 security create_keystore /keystore

# 2. Create an enclave per trusted node identity.
ros2 security create_enclave /keystore /guarded_talker
ros2 security create_enclave /keystore /guarded_camera

# 3. Overlay custom permissions.xml to restrict each enclave to its
# declared topic, then re-sign with the permissions CA:
openssl smime -sign \
-in /keystore/enclaves/guarded_talker/permissions.xml \
-text -out /keystore/enclaves/guarded_talker/permissions.p7s \
-signer /keystore/public/permissions_ca.cert.pem \
-inkey /keystore/private/permissions_ca.key.pem
```

## Launching a guarded node

Any ROS 2 node can become a guarded node with three environment
variables and a matching enclave:

```bash
export ROS_SECURITY_ENABLE=true
export ROS_SECURITY_STRATEGY=Enforce # NOT Permissive
export ROS_SECURITY_KEYSTORE=/keystore
# Then pass --enclave /guarded_talker when launching.
```

Under `Enforce`, rmw refuses to bring up a participant whose enclave is
missing, expired, or whose permissions.xml disallows the requested
topic — which is exactly the property the attacker container is
exercising.

## Optional: namespace separation + network policies

If you deploy on Kubernetes or a multi-subnet environment, pair SROS2
with one or both of:

- A distinct `ROS_DOMAIN_ID` per trust boundary (each domain is a
separate DDS multicast plane; nodes on different domains cannot even
discover each other).
- A `NetworkPolicy` that restricts DDS ports (default `7400-7500`) to
the pods with a `sint-enclave=<name>` label.

These are defense-in-depth layers on top of SROS2, not replacements for
it.

## Optional: gateway-side endpoint allowlist

For air-gapped or one-way deployments, the Policy Gateway can be
configured to reject `ros2://` resources outside an operator-supplied
allowlist (e.g. `ros2:///cmd_vel`, `ros2:///camera/*`). This is useful
as a belt-and-braces check against tokens being misused against
topics that were never meant to be reachable. See
[`packages/policy-gateway`](../../packages/policy-gateway) for the
policy hook.

## Limitations

- This demo is deliberately small: one talker enclave, one subscribe
enclave, two containers. A real deployment needs one enclave per
distinct node identity and a governance.xml matching the
operator's risk posture.
- A full ROS 2 middleware interceptor (a hard man-in-the-middle that
the node cannot go around) is out of scope — see the non-goals in
issue #161.
- SROS2 only covers DDS traffic. If your agent has *other* I/O (HTTP
clients, file system, subprocess) you need a complementary sandbox
(e.g. seccomp, AppArmor, or the `NemoClaw` runtime).
77 changes: 77 additions & 0 deletions examples/ros2-transport-hardening/attacker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""Unauthorized attacker: attempts to publish /cmd_vel without going through
the policy gateway and without an enclave that allows it.

Expected behavior under the hardened configuration:
* ROS_SECURITY_ENABLE=true + Enforce means the DDS participant has no
credentials — discovery drops it, no publish ever lands on the guarded
side.
* The script still calls `publish()` and then waits, but no subscriber
will match because mutual auth fails. We detect this by counting the
number of matched subscribers after a brief wait.

Exit codes (consumed by conformance-check.sh):
0 — attacker successfully published (HARDENING FAILED, should never
happen under Enforce).
2 — attacker could not find any matched subscriber (EXPECTED).
3 — attacker could not even initialize an authenticated participant
(EXPECTED; strongest form of rejection).
"""

from __future__ import annotations

import os
import sys
import time


def main() -> int:
# Explicitly NOT routing through the Policy Gateway. We are simulating a
# compromised process on the same network trying to inject commands.
try:
import rclpy
from geometry_msgs.msg import Twist
except Exception as exc:
print(f"[attacker] rclpy import failed: {exc}", file=sys.stderr)
return 3

try:
rclpy.init(args=sys.argv)
except Exception as exc:
# When security is enforced and we lack credentials, rmw rejects
# participant creation outright. This is the strongest rejection.
print(f"[attacker] rclpy.init refused by rmw: {exc}", file=sys.stderr)
print("[attacker] BLOCKED at participant creation")
return 3

try:
# No enclave → defaults to /, which has no permissions in our
# keystore, so the participant is either refused or isolated.
node = rclpy.create_node("bypass_attacker")
pub = node.create_publisher(Twist, "cmd_vel", 10)

deadline = time.time() + 10.0
while time.time() < deadline:
rclpy.spin_once(node, timeout_sec=0.2)
if pub.get_subscription_count() > 0:
# If we see a matched subscriber, try to publish and then
# surface the count so the conformance check can assert.
msg = Twist()
msg.linear.x = 1.0
pub.publish(msg)
print(f"[attacker] matched_subscribers={pub.get_subscription_count()} — BYPASS SUCCEEDED")
node.destroy_node()
return 0

matched = pub.get_subscription_count()
print(f"[attacker] matched_subscribers={matched} after 10s — BLOCKED by transport")
node.destroy_node()
return 2
finally:
try:
rclpy.shutdown()
except Exception:
pass


if __name__ == "__main__":
raise SystemExit(main())
74 changes: 74 additions & 0 deletions examples/ros2-transport-hardening/conformance-check.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#!/usr/bin/env bash
# Conformance check for transport-level non-bypass hardening.
#
# Orchestration (run from this directory):
# 1. Build and start all services via docker compose.
# 2. Wait for the guarded_talker to log "[guarded_talker] OK".
# 3. Read the attacker container's exit code.
#
# Pass criteria:
# - guarded_talker logged OK (policy + transport round-trip worked)
# - attacker exit code ∈ {2, 3}
# (blocked at either subscriber-match or participant-creation stage)
#
# Fail criteria:
# - attacker exit code == 0 (bypass succeeded — regression)
# - guarded_talker never reached OK (demo itself is broken)

set -euo pipefail

HERE="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "${HERE}"

COMPOSE=(docker compose -f docker-compose.yml)

cleanup() {
"${COMPOSE[@]}" down -v --remove-orphans >/dev/null 2>&1 || true
}
trap cleanup EXIT

echo "[conformance] building images"
"${COMPOSE[@]}" build

echo "[conformance] starting infra + guarded services"
"${COMPOSE[@]}" up -d postgres redis gateway keystore-init
"${COMPOSE[@]}" up -d guarded-talker

echo "[conformance] waiting for guarded_talker OK sentinel"
deadline=$(( SECONDS + 90 ))
while (( SECONDS < deadline )); do
if "${COMPOSE[@]}" logs guarded-talker 2>/dev/null | grep -q "\[guarded_talker\] OK"; then
echo "[conformance] guarded_talker reached OK"
break
fi
sleep 2
done

if ! "${COMPOSE[@]}" logs guarded-talker 2>/dev/null | grep -q "\[guarded_talker\] OK"; then
echo "[conformance] FAIL: guarded_talker never logged OK" >&2
"${COMPOSE[@]}" logs guarded-talker
exit 1
fi

echo "[conformance] running attacker (expect blocked)"
set +e
"${COMPOSE[@]}" run --rm attacker
attacker_exit=$?
set -e

echo "[conformance] attacker exit code: ${attacker_exit}"

case "${attacker_exit}" in
2|3)
echo "[conformance] PASS: attacker blocked at transport (exit ${attacker_exit})"
exit 0
;;
0)
echo "[conformance] FAIL: attacker bypass succeeded" >&2
exit 1
;;
*)
echo "[conformance] FAIL: unexpected attacker exit ${attacker_exit}" >&2
exit 1
;;
esac
Loading
Loading