diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index aefaa9e..5871c14 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,7 +32,7 @@ jobs: python -m pip install --upgrade pip pip install --only-binary eclipse-zenoh -e "packages/device-connect-edge[dev]" - name: Run unit tests - run: pytest packages/device-connect-edge/tests/ -v --timeout=30 --tb=short + run: pytest packages/device-connect-edge/tests/ -v --timeout=30 --tb=short --ignore=packages/device-connect-edge/tests/fuzz unit-tests-server: runs-on: ubuntu-latest @@ -49,7 +49,7 @@ jobs: pip install --only-binary eclipse-zenoh -e packages/device-connect-edge pip install --only-binary eclipse-zenoh -e "packages/device-connect-server[all]" - name: Run unit tests - run: pytest packages/device-connect-server/tests/ -v --timeout=30 --tb=short + run: pytest packages/device-connect-server/tests/ -v --timeout=30 --tb=short --ignore=packages/device-connect-server/tests/fuzz unit-tests-agent-tools: runs-on: ubuntu-latest @@ -86,6 +86,73 @@ jobs: ruff check packages/device-connect-agent-tools/ ruff check tests/ + # ── Fuzz tests (all packages, single report) ──────────────────── + fuzz-tests-hypothesis: + runs-on: ubuntu-latest + timeout-minutes: 15 + steps: + - uses: actions/checkout@v6 + - uses: actions/setup-python@v6 + with: + python-version: ${{ env.PYTHON_VERSION }} + cache: 'pip' + - name: Install all packages + run: | + python -m pip install --upgrade pip + pip install --only-binary eclipse-zenoh -e "packages/device-connect-edge[dev,fuzz]" + pip install --only-binary eclipse-zenoh -e "packages/device-connect-server[dev,fuzz]" + pip install --only-binary eclipse-zenoh -e "packages/device-connect-agent-tools[dev,fuzz]" + - name: Run hypothesis fuzz tests (all packages) + env: + HYPOTHESIS_PROFILE: ci + run: | + pytest \ + packages/device-connect-edge/tests/fuzz/test_fuzz_*.py \ + packages/device-connect-server/tests/fuzz/test_fuzz_*.py \ + packages/device-connect-agent-tools/tests/fuzz/test_fuzz_*.py \ + -v --timeout=120 --tb=short --junitxml=fuzz-hypothesis-results.xml + - name: Publish hypothesis findings summary + if: always() + run: python packages/device-connect-edge/tests/fuzz/report_hypothesis.py fuzz-hypothesis-results.xml >> "$GITHUB_STEP_SUMMARY" + - name: Upload hypothesis report + if: always() + uses: actions/upload-artifact@v5 + with: + name: fuzz-hypothesis-results + path: fuzz-hypothesis-results.xml + retention-days: 30 + + fuzz-tests-atheris: + runs-on: ubuntu-latest + timeout-minutes: 20 + steps: + - uses: actions/checkout@v6 + - uses: actions/setup-python@v6 + with: + python-version: ${{ env.PYTHON_VERSION }} + cache: 'pip' + - name: Install all packages + run: | + python -m pip install --upgrade pip + pip install --only-binary eclipse-zenoh -e "packages/device-connect-edge[dev,fuzz]" + pip install --only-binary eclipse-zenoh -e "packages/device-connect-server[dev,fuzz]" + pip install --only-binary eclipse-zenoh -e "packages/device-connect-agent-tools[dev,fuzz]" + pip install atheris + - name: Run atheris fuzz targets (all packages) + run: python packages/device-connect-edge/tests/fuzz/run_atheris.py --iterations=50000 + - name: Publish atheris findings summary + if: always() + run: cat atheris-report.md >> "$GITHUB_STEP_SUMMARY" + - name: Upload atheris crashes and report + if: always() + uses: actions/upload-artifact@v5 + with: + name: fuzz-atheris-results + path: | + crash-* + atheris-report.md + retention-days: 30 + # ── Tier 1: Integration tests (no LLM) ──────────────────────── integration-tests: runs-on: ubuntu-latest diff --git a/.gitignore b/.gitignore index fe429c4..2ca8ddc 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,16 @@ build/ *.egg .venv/ .vscode/ +.DS_Store .env .pytest_cache/ .mypy_cache/ + +# Fuzz testing artifacts +crash-* +timeout-* +htmlcov/ +.hypothesis/ +atheris-report.md +fuzz-hypothesis-results.xml +**/fuzz/findings/ diff --git a/packages/device-connect-agent-tools/device_connect_agent_tools/connection.py b/packages/device-connect-agent-tools/device_connect_agent_tools/connection.py index 7b22c51..19a8185 100644 --- a/packages/device-connect-agent-tools/device_connect_agent_tools/connection.py +++ b/packages/device-connect-agent-tools/device_connect_agent_tools/connection.py @@ -136,6 +136,38 @@ def flatten_device(raw: Dict[str, Any]) -> Dict[str, Any]: } +# ── Message parsing helpers (extracted for testability) ───────────── + + +def parse_buffered_payload(data: bytes) -> dict: + """Parse raw bytes into a payload dict for buffered subscriptions. + + Always returns a dict — falls back to ``{"raw": ...}`` on any error. + """ + try: + payload = json.loads(data.decode()) + if not isinstance(payload, dict): + payload = {"raw": str(payload)[:500]} + except Exception: + payload = {"raw": data.decode("utf-8", errors="replace")[:500]} + return payload + + +def parse_event_payload(data: bytes) -> dict: + """Parse raw bytes into a normalized event dict. + + Returns dict with keys ``device_id``, ``event_name``, ``params``. + Raises on malformed input (caller should catch). + """ + payload = json.loads(data.decode()) + if not isinstance(payload, dict): + raise ValueError("Expected JSON object") + method = payload.get("method", "") + dev_id = payload.get("params", {}).get("device_id", "unknown") + params = payload.get("params", {}) + return {"device_id": dev_id, "event_name": method, "params": params} + + # ── Connection class ──────────────────────────────────────────────── @@ -397,10 +429,7 @@ def subscribe_buffered( async def _do_subscribe(): async def _on_msg(data: bytes, msg_subject: str, reply: str = ""): - try: - payload = json.loads(data.decode()) - except Exception: - payload = {"raw": data.decode()[:500]} + payload = parse_buffered_payload(data) # Store as (subject, data) tuple self._inbox[name].append((msg_subject, payload)) # Trim to prevent unbounded growth @@ -498,22 +527,13 @@ async def subscribe_events( async def _on_msg(data: bytes, reply: str = ""): try: - payload = json.loads(data.decode()) - method = payload.get("method", "") - dev_id = payload.get("params", {}).get("device_id", "unknown") - event_name = method - params = payload.get("params", {}) - + event = parse_event_payload(data) logger.info( "EVENT <- %s::%s %s", - dev_id, event_name, - json.dumps(params, default=str), + event["device_id"], event["event_name"], + json.dumps(event["params"], default=str), ) - await buffer.put({ - "device_id": dev_id, - "event_name": event_name, - "params": params, - }) + await buffer.put(event) except Exception as e: logger.error("Error parsing event: %s", e) diff --git a/packages/device-connect-agent-tools/pyproject.toml b/packages/device-connect-agent-tools/pyproject.toml index 8cc4331..f55f296 100644 --- a/packages/device-connect-agent-tools/pyproject.toml +++ b/packages/device-connect-agent-tools/pyproject.toml @@ -37,6 +37,10 @@ dev = [ "pytest-asyncio>=0.23", "pytest-timeout>=2.0", ] +fuzz = [ + "hypothesis>=6.0", + "coverage>=7.0", +] [tool.pytest.ini_options] asyncio_mode = "strict" @@ -44,6 +48,9 @@ markers = [ "integration: requires live Device Connect infrastructure (NATS/Zenoh + registry)", ] +[tool.setuptools.packages.find] +include = ["device_connect_agent_tools*"] + [project.urls] Homepage = "https://github.com/arm/device-connect" Repository = "https://github.com/arm/device-connect.git" diff --git a/packages/device-connect-agent-tools/tests/fuzz/README.md b/packages/device-connect-agent-tools/tests/fuzz/README.md new file mode 100644 index 0000000..d836b65 --- /dev/null +++ b/packages/device-connect-agent-tools/tests/fuzz/README.md @@ -0,0 +1,53 @@ +# Fuzz Testing for Device Connect Agent Tools + +Fuzz tests for the agent-tools package using [Hypothesis](https://hypothesis.readthedocs.io/) (property-based, pytest-integrated) and [Atheris](https://github.com/google/atheris) (coverage-guided, libFuzzer-based). + +For full setup instructions (installing atheris on macOS/Linux, deep fuzzing, CI integration), see the [edge fuzz README](../../device-connect-edge/tests/fuzz/README.md). + +## Fuzz Targets + +| Target | Hypothesis | Atheris | What it tests | +|--------|-----------|---------|---------------| +| Tool Name Parsing | `test_fuzz_schema.py` | `fuzz_schema.py` | `parse_tool_name()` — MCP tool name splitting on `::` delimiter | +| JSON-RPC Parsing | `test_fuzz_jsonrpc_parsing.py` | `fuzz_jsonrpc_parsing.py` | Buffered message and event message parsing from `connection.py` | + +## Running Locally + +### Hypothesis (pytest) + +```bash +cd packages/device-connect-agent-tools +pip install -e ".[dev,fuzz]" + +# Run all fuzz tests — findings shown in terminal output +pytest tests/fuzz/test_fuzz_*.py -v + +# More examples for deeper coverage +HYPOTHESIS_PROFILE=ci pytest tests/fuzz/test_fuzz_*.py -v +``` + +### Atheris + +```bash +pip install atheris # Linux: works directly. macOS: see edge README for LLVM setup. + +# Run individual targets directly +cd packages/device-connect-agent-tools +python tests/fuzz/fuzz_schema.py tests/fuzz/corpus/tool_names/ -max_total_time=300 +python tests/fuzz/fuzz_jsonrpc_parsing.py tests/fuzz/corpus/jsonrpc_messages/ -max_total_time=300 + +# Deep fuzzing (1 hour per target, run indefinitely with no flags) +python tests/fuzz/fuzz_schema.py tests/fuzz/corpus/tool_names/ -max_total_time=3600 + +# Run all targets across ALL packages (from repo root) — writes atheris-report.md +python packages/device-connect-edge/tests/fuzz/run_atheris.py --iterations=50000 +``` + +### Where to find results + +| Tool | Where | +|------|-------| +| Hypothesis | Terminal output from pytest | +| Atheris (unified runner) | `atheris-report.md` at repo root | +| Atheris (direct) | `crash-` files in current directory | +| CI | GitHub Actions job summary (scroll down on run page) | diff --git a/packages/device-connect-agent-tools/tests/fuzz/conftest.py b/packages/device-connect-agent-tools/tests/fuzz/conftest.py new file mode 100644 index 0000000..25a2fdb --- /dev/null +++ b/packages/device-connect-agent-tools/tests/fuzz/conftest.py @@ -0,0 +1,19 @@ +"""Hypothesis configuration for agent-tools fuzz tests.""" + +import os + +from hypothesis import settings, HealthCheck + +settings.register_profile( + "default", + max_examples=5000, + suppress_health_check=[HealthCheck.too_slow], +) + +settings.register_profile( + "ci", + max_examples=20000, + suppress_health_check=[HealthCheck.too_slow], +) + +settings.load_profile(os.getenv("HYPOTHESIS_PROFILE", "default")) diff --git a/packages/device-connect-agent-tools/tests/fuzz/corpus/jsonrpc_messages/valid_event.json b/packages/device-connect-agent-tools/tests/fuzz/corpus/jsonrpc_messages/valid_event.json new file mode 100644 index 0000000..e852c4c --- /dev/null +++ b/packages/device-connect-agent-tools/tests/fuzz/corpus/jsonrpc_messages/valid_event.json @@ -0,0 +1 @@ +{"jsonrpc":"2.0","method":"event/objectDetected","params":{"device_id":"cam-01","label":"person","confidence":0.95}} \ No newline at end of file diff --git a/packages/device-connect-agent-tools/tests/fuzz/corpus/jsonrpc_messages/valid_response.json b/packages/device-connect-agent-tools/tests/fuzz/corpus/jsonrpc_messages/valid_response.json new file mode 100644 index 0000000..bbb9af8 --- /dev/null +++ b/packages/device-connect-agent-tools/tests/fuzz/corpus/jsonrpc_messages/valid_response.json @@ -0,0 +1 @@ +{"jsonrpc":"2.0","id":"req-001","result":{"status":"ok","data":[1,2,3]}} \ No newline at end of file diff --git a/packages/device-connect-agent-tools/tests/fuzz/corpus/tool_names/valid.txt b/packages/device-connect-agent-tools/tests/fuzz/corpus/tool_names/valid.txt new file mode 100644 index 0000000..cb699e9 --- /dev/null +++ b/packages/device-connect-agent-tools/tests/fuzz/corpus/tool_names/valid.txt @@ -0,0 +1 @@ +camera-01::captureImage \ No newline at end of file diff --git a/packages/device-connect-agent-tools/tests/fuzz/fuzz_jsonrpc_parsing.py b/packages/device-connect-agent-tools/tests/fuzz/fuzz_jsonrpc_parsing.py new file mode 100644 index 0000000..d623cf6 --- /dev/null +++ b/packages/device-connect-agent-tools/tests/fuzz/fuzz_jsonrpc_parsing.py @@ -0,0 +1,38 @@ +"""Atheris fuzz target: JSON-RPC message parsing from connection layer. + +Run: + python tests/fuzz/fuzz_jsonrpc_parsing.py tests/fuzz/corpus/jsonrpc_messages/ -max_total_time=300 +""" + +import sys +import json + +import atheris + +with atheris.instrument_imports(): + from device_connect_agent_tools.connection import ( + parse_buffered_payload, + parse_event_payload, + ) + + +def TestOneInput(data: bytes) -> None: + # Always test buffered parsing (never crashes) + result = parse_buffered_payload(data) + assert isinstance(result, dict) + + # Test event parsing (may fail on non-JSON) + try: + result = parse_event_payload(data) + assert isinstance(result, dict) + except (json.JSONDecodeError, UnicodeDecodeError, ValueError): + pass + + +def main(): + atheris.Setup(sys.argv, TestOneInput) + atheris.Fuzz() + + +if __name__ == "__main__": + main() diff --git a/packages/device-connect-agent-tools/tests/fuzz/fuzz_schema.py b/packages/device-connect-agent-tools/tests/fuzz/fuzz_schema.py new file mode 100644 index 0000000..e412427 --- /dev/null +++ b/packages/device-connect-agent-tools/tests/fuzz/fuzz_schema.py @@ -0,0 +1,35 @@ +"""Atheris fuzz target: MCP tool name parsing. + +Run: + python tests/fuzz/fuzz_schema.py fuzz/corpus/tool_names/ -max_total_time=300 +""" + +import sys + +import atheris + +with atheris.instrument_imports(): + from device_connect_agent_tools.mcp.schema import parse_tool_name + + +def TestOneInput(data: bytes) -> None: + try: + text = data.decode("utf-8", errors="replace") + except Exception: + return + + try: + result = parse_tool_name(text) + assert isinstance(result, tuple) + assert len(result) == 2 + except ValueError: + pass + + +def main(): + atheris.Setup(sys.argv, TestOneInput) + atheris.Fuzz() + + +if __name__ == "__main__": + main() diff --git a/packages/device-connect-agent-tools/tests/fuzz/test_fuzz_jsonrpc_parsing.py b/packages/device-connect-agent-tools/tests/fuzz/test_fuzz_jsonrpc_parsing.py new file mode 100644 index 0000000..3b0d9c2 --- /dev/null +++ b/packages/device-connect-agent-tools/tests/fuzz/test_fuzz_jsonrpc_parsing.py @@ -0,0 +1,60 @@ +"""Property-based fuzz tests for JSON-RPC message parsing in connection layer. + +Run: + pytest tests/fuzz/test_fuzz_jsonrpc_parsing.py -v +""" + +import json + +from hypothesis import given, settings, HealthCheck +from hypothesis import strategies as st + +from device_connect_agent_tools.connection import ( + parse_buffered_payload, + parse_event_payload, +) + + +@given(data=st.binary(max_size=4096)) +@settings(max_examples=5000, suppress_health_check=[HealthCheck.too_slow]) +def test_buffered_message_parsing_never_crashes(data): + """Buffered message parsing must always return a dict.""" + result = parse_buffered_payload(data) + assert isinstance(result, dict) + + +@given(data=st.binary(max_size=4096)) +@settings(max_examples=5000, suppress_health_check=[HealthCheck.too_slow]) +def test_event_parsing_handles_arbitrary_bytes(data): + """Event parsing should handle or reject arbitrary bytes gracefully.""" + try: + result = parse_event_payload(data) + assert isinstance(result, dict) + assert "device_id" in result + assert "event_name" in result + except (json.JSONDecodeError, UnicodeDecodeError, ValueError): + pass + + +json_primitives = st.one_of( + st.none(), st.booleans(), st.integers(), st.text(max_size=100), + st.floats(allow_nan=False, allow_infinity=False), +) + +json_values = st.recursive( + json_primitives, + lambda children: st.one_of( + st.lists(children, max_size=3), + st.dictionaries(st.text(max_size=20), children, max_size=5), + ), + max_leaves=10, +) + + +@given(payload=st.dictionaries(st.text(max_size=30), json_values, max_size=8)) +@settings(max_examples=5000, suppress_health_check=[HealthCheck.too_slow]) +def test_event_parsing_structured_input(payload): + """Event parsing with structured JSON dicts must not crash.""" + data = json.dumps(payload).encode() + result = parse_event_payload(data) + assert isinstance(result, dict) diff --git a/packages/device-connect-agent-tools/tests/fuzz/test_fuzz_schema.py b/packages/device-connect-agent-tools/tests/fuzz/test_fuzz_schema.py new file mode 100644 index 0000000..c59ea69 --- /dev/null +++ b/packages/device-connect-agent-tools/tests/fuzz/test_fuzz_schema.py @@ -0,0 +1,40 @@ +"""Property-based fuzz tests for MCP tool name parsing. + +Run: + pytest tests/fuzz/test_fuzz_schema.py -v +""" + +from hypothesis import given, settings +from hypothesis import strategies as st + +from device_connect_agent_tools.mcp.schema import parse_tool_name + + +@given(name=st.text(max_size=500)) +@settings(max_examples=5000) +def test_parse_tool_name_never_crashes(name): + """parse_tool_name must either return a tuple or raise ValueError.""" + try: + result = parse_tool_name(name) + assert isinstance(result, tuple) + assert len(result) == 2 + assert isinstance(result[0], str) + assert isinstance(result[1], str) + except ValueError: + pass + + +@given( + device_id=st.text(min_size=1, max_size=100), + function_name=st.text(min_size=1, max_size=100), +) +@settings(max_examples=3000) +def test_parse_tool_name_roundtrip(device_id, function_name): + """Valid device_id::function_name should always parse correctly.""" + # Skip if device_id contains : (would form ambiguous :: boundaries) + if ":" in device_id: + return + tool_name = f"{device_id}::{function_name}" + dev, func = parse_tool_name(tool_name) + assert dev == device_id + assert func == function_name diff --git a/packages/device-connect-edge/device_connect_edge/messaging/config.py b/packages/device-connect-edge/device_connect_edge/messaging/config.py index 79cc1b2..a791f70 100644 --- a/packages/device-connect-edge/device_connect_edge/messaging/config.py +++ b/packages/device-connect-edge/device_connect_edge/messaging/config.py @@ -139,6 +139,9 @@ def _load_credentials_file(filepath: str) -> Dict[str, Any]: with open(path, "r") as f: data = json.load(f) + if not isinstance(data, dict): + return {} + # Extract NATS credentials if nested if "nats" in data: nats_config = data["nats"] diff --git a/packages/device-connect-edge/pyproject.toml b/packages/device-connect-edge/pyproject.toml index bd275a4..57921dc 100644 --- a/packages/device-connect-edge/pyproject.toml +++ b/packages/device-connect-edge/pyproject.toml @@ -46,6 +46,10 @@ dev = [ "pytest-timeout>=2.0", "device-connect-edge[telemetry]", ] +fuzz = [ + "hypothesis>=6.0", + "coverage>=7.0", +] [project.urls] Homepage = "https://github.com/arm/device-connect" diff --git a/packages/device-connect-edge/tests/fuzz/README.md b/packages/device-connect-edge/tests/fuzz/README.md new file mode 100644 index 0000000..7dd44d9 --- /dev/null +++ b/packages/device-connect-edge/tests/fuzz/README.md @@ -0,0 +1,288 @@ +# Fuzz Testing for Device Connect Edge + +This directory contains fuzz tests for the Device Connect Edge SDK using two complementary tools: + +- **[Hypothesis](https://hypothesis.readthedocs.io/)** — Property-based fuzzing, integrated with pytest. Works on all platforms out of the box. +- **[Atheris](https://github.com/google/atheris)** — Coverage-guided fuzzing powered by libFuzzer. Best for deep, long-running fuzz campaigns. + +## Fuzz Targets + +| Target | Hypothesis (pytest) | Atheris | What it tests | +|--------|-------------------|---------|---------------| +| JSON-RPC commands | `test_fuzz_jsonrpc_cmd.py` | `fuzz_jsonrpc_cmd.py` | `DeviceRuntime._cmd_subscription()` parsing — JSON-RPC decoding, method/id/params extraction, `_dc_meta` trace context | +| NATS credentials | `test_fuzz_nats_creds.py` | `fuzz_nats_creds.py` | `MessagingConfig._parse_nats_creds_file()` — manual string parsing with `.find()` and slicing | +| Pydantic models | `test_fuzz_pydantic_models.py` | `fuzz_pydantic_models.py` | `DeviceIdentity`, `DeviceStatus`, `DeviceCapabilities`, `FunctionDef`, `EventDef` validation | +| Credentials JSON | `test_fuzz_credentials_json.py` | `fuzz_credentials_json.py` | `MessagingConfig._load_credentials_file()` — JSON credential loading with `.creds` fallback | + +--- + +## Setup + +### Prerequisites + +- Python >= 3.11 +- pip + +### Install Hypothesis (all platforms) + +```bash +cd packages/device-connect-edge +pip install -e ".[dev,fuzz]" +``` + +This installs `hypothesis` and `coverage`. You can now run all `test_fuzz_*.py` tests. + +### Install Atheris + +Atheris requires a Clang compiler with libFuzzer support. Setup differs by platform. + +#### Linux + +```bash +pip install atheris +``` + +On most Linux distributions, the system Clang includes libFuzzer and atheris installs directly. + +If it fails, install a newer Clang: + +```bash +# Ubuntu/Debian +sudo apt-get install clang + +# Then retry +pip install atheris +``` + +#### macOS + +Apple Clang does **not** include libFuzzer. You need LLVM from Homebrew: + +```bash +# Step 1: Install LLVM (one-time, ~2-3 min via bottle) +brew install llvm + +# Step 2: Install atheris using Homebrew's clang +CLANG_BIN="/opt/homebrew/opt/llvm/bin/clang" pip install atheris +``` + +> **Note**: If Homebrew LLVM doesn't include `libclang_rt.fuzzer_osx.a` (you can check +> with `find /opt/homebrew/opt/llvm -name "*fuzzer*"`), you'll need to build LLVM from +> source instead: +> +> ```bash +> git clone https://github.com/llvm/llvm-project.git +> cd llvm-project && mkdir build && cd build +> cmake -DLLVM_ENABLE_PROJECTS='clang;compiler-rt' -G "Unix Makefiles" ../llvm +> make -j $(sysctl -n hw.ncpu) +> CLANG_BIN="$(pwd)/bin/clang" pip install atheris +> ``` +> +> The cloned `llvm-project/` directory can be deleted after atheris is installed. +> Nothing is installed system-wide; your system Clang and Xcode are unaffected. + +#### Verify installation + +```bash +python -c "import atheris; print('atheris', atheris.__version__, '- OK')" +``` + +--- + +## Running Hypothesis Tests (Recommended Starting Point) + +Hypothesis tests run via pytest and work on any platform. + +```bash +cd packages/device-connect-edge + +# Run all fuzz tests +pytest tests/fuzz/test_fuzz_*.py -v + +# Run a specific target +pytest tests/fuzz/test_fuzz_jsonrpc_cmd.py -v + +# Reproducible run with a fixed seed +pytest tests/fuzz/test_fuzz_jsonrpc_cmd.py -v --hypothesis-seed=0 + +# Run more examples for deeper coverage +HYPOTHESIS_PROFILE=ci pytest tests/fuzz/test_fuzz_*.py -v +``` + +### Hypothesis Profiles + +Two profiles are configured in `tests/fuzz/conftest.py`: + +| Profile | Examples per test | Use case | +|---------|------------------|----------| +| `default` | 5,000 | Local development | +| `ci` | 20,000 | CI pipelines, thorough runs | + +Select a profile with: `HYPOTHESIS_PROFILE=ci` + +--- + +## Running Atheris (Deep Fuzzing) + +Atheris is best for long-running, coverage-guided campaigns that discover deeper bugs. + +```bash +cd packages/device-connect-edge + +# Quick smoke test (1,000 iterations) +python tests/fuzz/fuzz_jsonrpc_cmd.py -atheris_runs=1000 + +# 5-minute run with seed corpus +python tests/fuzz/fuzz_jsonrpc_cmd.py tests/fuzz/corpus/jsonrpc_cmd/ -max_total_time=300 + +# Run all targets (5 min each) +python tests/fuzz/fuzz_jsonrpc_cmd.py tests/fuzz/corpus/jsonrpc_cmd/ -max_total_time=300 +python tests/fuzz/fuzz_nats_creds.py tests/fuzz/corpus/nats_creds/ -max_total_time=300 +python tests/fuzz/fuzz_pydantic_models.py tests/fuzz/corpus/pydantic_models/ -max_total_time=300 +python tests/fuzz/fuzz_credentials_json.py tests/fuzz/corpus/credentials_json/ -max_total_time=300 + +# Run with coverage report +python -m coverage run tests/fuzz/fuzz_jsonrpc_cmd.py -atheris_runs=100000 +python -m coverage html +open htmlcov/index.html +``` + +### Deep Fuzzing (recommended for thorough testing) + +The commands above are quick smoke tests. For thorough bug discovery comparable to +an AFL campaign, run atheris for **hours** using `-max_total_time` (in seconds): + +```bash +# 1 hour per target +python tests/fuzz/fuzz_jsonrpc_cmd.py tests/fuzz/corpus/jsonrpc_cmd/ -max_total_time=3600 + +# 8 hours overnight +python tests/fuzz/fuzz_credentials_json.py tests/fuzz/corpus/credentials_json/ -max_total_time=28800 + +# Run indefinitely until you Ctrl+C (like AFL) +python tests/fuzz/fuzz_jsonrpc_cmd.py tests/fuzz/corpus/jsonrpc_cmd/ +``` + +**`-atheris_runs` vs `-max_total_time`**: `-atheris_runs=50000` caps iterations and +finishes in seconds — useful for CI gates. `-max_total_time=3600` runs for a fixed +duration regardless of iteration count — useful for deep fuzzing. Without either flag, +atheris runs **indefinitely** until interrupted, just like AFL. + +**Why short runs find less**: Python fuzzing is slower than C-based AFL (~1,000–7,000 +exec/s vs 10,000–50,000 exec/s). Short iteration-capped runs are smoke tests, not +deep campaigns. Run for hours on a dedicated machine for best results. + +### Atheris Results + +- **Crashes** are saved as `crash-` files in the current directory +- **Timeouts** are saved as `timeout-` files +- **Reproduce** a crash: `python tests/fuzz/fuzz_jsonrpc_cmd.py crash-` + +### Auto-generated corpus cleanup + +Atheris writes new corpus entries into the corpus directory as it discovers new +coverage paths. When using `tests/fuzz/run_atheris.py`, this is handled automatically — +seeds are copied into a temp directory that is cleaned up after each target runs. + +When running atheris harnesses directly, pass a **temporary directory** instead of the +seed corpus to avoid polluting it: + +```bash +# Copy seeds to a temp dir, run against it, then delete +cp -r tests/fuzz/corpus/jsonrpc_cmd/ /tmp/fuzz-corpus +python tests/fuzz/fuzz_jsonrpc_cmd.py /tmp/fuzz-corpus -max_total_time=300 +rm -rf /tmp/fuzz-corpus +``` + +--- + +## Seed Corpus + +The `tests/fuzz/corpus/` directory contains valid example inputs that atheris mutates to find edge cases: + +``` +tests/fuzz/corpus/ +├── jsonrpc_cmd/ # Valid JSON-RPC command messages +├── nats_creds/ # NATS .creds file samples +├── pydantic_models/ # Valid Pydantic model JSON +└── credentials_json/ # JSON credentials files +``` + +Hypothesis generates its own inputs from strategies and does not use the seed corpus. + +--- + +## Adding New Fuzz Targets + +1. **Identify a parsing function** that processes external input (network messages, files, configs) +2. **Create a hypothesis test** in `tests/fuzz/test_fuzz_.py`: + ```python + from hypothesis import given, settings + from hypothesis import strategies as st + + @given(data=st.binary(max_size=4096)) + @settings(max_examples=5000) + def test_target_never_crashes(data): + try: + your_parser(data) + except (ExpectedException1, ExpectedException2): + pass # Expected rejections — not bugs + ``` +3. **Create an atheris harness** in `tests/fuzz/fuzz_.py`: + ```python + import atheris, sys + + with atheris.instrument_imports(): + from your_module import your_parser + + def TestOneInput(data: bytes) -> None: + try: + your_parser(data) + except (ExpectedException1, ExpectedException2): + pass + + atheris.Setup(sys.argv, TestOneInput) + atheris.Fuzz() + ``` +4. **Add seed inputs** to `tests/fuzz/corpus//` + +### Guidelines + +- Only catch **expected** exceptions — unexpected ones are bugs worth investigating +- Seed corpus should contain valid inputs; the fuzzers mutate them to find edge cases +- Use `FuzzedDataProvider` (atheris) or structured strategies (hypothesis) for typed input +- Run atheris for hours/days on a dedicated machine for best results + +--- + +## CI Integration + +Fuzz tests run automatically in GitHub Actions (`.github/workflows/ci.yml`) as two parallel jobs: + +### `fuzz-tests-hypothesis` + +- Runs on every push/PR to `main` +- Uses the `ci` profile (20,000 examples per test via `HYPOTHESIS_PROFILE=ci`) +- Runs all `tests/fuzz/test_fuzz_*.py` tests via pytest +- No special dependencies — works on `ubuntu-latest` out of the box + +### `fuzz-tests-atheris` + +- Runs on every push/PR to `main` +- Runs each atheris target for 50,000 iterations +- On `ubuntu-latest`, `pip install atheris` works directly (no LLVM setup needed) + +Both jobs run in parallel with unit tests and do not block integration tests. + +### Where findings are published + +Findings from both tools are published to the **GitHub Actions job summary** — visible on +the Actions tab under each run's **Summary** section (scroll down past the job list). + +**Hypothesis**: pytest produces a JUnit XML report, which `tests/fuzz/report_hypothesis.py` converts +to a markdown summary showing pass/fail counts and expandable tracebacks for each failure. +The JUnit XML is also uploaded as an artifact (retained 30 days). + +**Atheris**: `tests/fuzz/run_atheris.py` runs all targets and generates `atheris-report.md` with a +results table and expandable crash details. Both the report and any `crash-*` files are +uploaded as artifacts (retained 30 days). diff --git a/packages/device-connect-edge/tests/fuzz/conftest.py b/packages/device-connect-edge/tests/fuzz/conftest.py new file mode 100644 index 0000000..24f9a4d --- /dev/null +++ b/packages/device-connect-edge/tests/fuzz/conftest.py @@ -0,0 +1,27 @@ +"""Hypothesis configuration for fuzz tests. + +Profiles: 'default' for local dev, 'ci' for thorough CI runs. + +Select profile via env var: + HYPOTHESIS_PROFILE=ci pytest tests/fuzz/test_fuzz_*.py +""" + +import os + +from hypothesis import settings, HealthCheck + +# Default profile: fast iteration during local development +settings.register_profile( + "default", + max_examples=5000, + suppress_health_check=[HealthCheck.too_slow], +) + +# CI profile: more examples for thorough coverage +settings.register_profile( + "ci", + max_examples=20000, + suppress_health_check=[HealthCheck.too_slow], +) + +settings.load_profile(os.getenv("HYPOTHESIS_PROFILE", "default")) diff --git a/packages/device-connect-edge/tests/fuzz/corpus/credentials_json/valid_flat.json b/packages/device-connect-edge/tests/fuzz/corpus/credentials_json/valid_flat.json new file mode 100644 index 0000000..088d0ab --- /dev/null +++ b/packages/device-connect-edge/tests/fuzz/corpus/credentials_json/valid_flat.json @@ -0,0 +1 @@ +{"jwt":"eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5In0.test","nkey_seed":"SUAM2B7EACOOJNBGQWZLOEYHZWBMP6R2YA6SKHFUHLQHKP5QGPBVQNMSQ"} \ No newline at end of file diff --git a/packages/device-connect-edge/tests/fuzz/corpus/credentials_json/valid_nested.json b/packages/device-connect-edge/tests/fuzz/corpus/credentials_json/valid_nested.json new file mode 100644 index 0000000..1d71dc7 --- /dev/null +++ b/packages/device-connect-edge/tests/fuzz/corpus/credentials_json/valid_nested.json @@ -0,0 +1 @@ +{"nats":{"jwt":"eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5In0.test","nkey_seed":"SUAM2B7EACOOJNBGQWZLOEYHZWBMP6R2YA6SKHFUHLQHKP5QGPBVQNMSQ"}} \ No newline at end of file diff --git a/packages/device-connect-edge/tests/fuzz/corpus/jsonrpc_cmd/cmd_with_meta.json b/packages/device-connect-edge/tests/fuzz/corpus/jsonrpc_cmd/cmd_with_meta.json new file mode 100644 index 0000000..ffc61d6 --- /dev/null +++ b/packages/device-connect-edge/tests/fuzz/corpus/jsonrpc_cmd/cmd_with_meta.json @@ -0,0 +1 @@ +{"jsonrpc":"2.0","method":"doSomething","id":"req-003","params":{"_dc_meta":{"source_device":"sensor-01","traceparent":"00-abcdef1234567890abcdef1234567890-1234567890abcdef-01","tracestate":"congo=t61rcWkgMzE"},"value":42}} \ No newline at end of file diff --git a/packages/device-connect-edge/tests/fuzz/corpus/jsonrpc_cmd/cmd_with_params.json b/packages/device-connect-edge/tests/fuzz/corpus/jsonrpc_cmd/cmd_with_params.json new file mode 100644 index 0000000..0afe3d6 --- /dev/null +++ b/packages/device-connect-edge/tests/fuzz/corpus/jsonrpc_cmd/cmd_with_params.json @@ -0,0 +1 @@ +{"jsonrpc":"2.0","method":"captureImage","id":"req-002","params":{"resolution":"1080p","format":"jpeg"}} \ No newline at end of file diff --git a/packages/device-connect-edge/tests/fuzz/corpus/jsonrpc_cmd/minimal.json b/packages/device-connect-edge/tests/fuzz/corpus/jsonrpc_cmd/minimal.json new file mode 100644 index 0000000..2f19d76 --- /dev/null +++ b/packages/device-connect-edge/tests/fuzz/corpus/jsonrpc_cmd/minimal.json @@ -0,0 +1 @@ +{"id":1} \ No newline at end of file diff --git a/packages/device-connect-edge/tests/fuzz/corpus/jsonrpc_cmd/valid_cmd.json b/packages/device-connect-edge/tests/fuzz/corpus/jsonrpc_cmd/valid_cmd.json new file mode 100644 index 0000000..96ffefd --- /dev/null +++ b/packages/device-connect-edge/tests/fuzz/corpus/jsonrpc_cmd/valid_cmd.json @@ -0,0 +1 @@ +{"jsonrpc":"2.0","method":"get_status","id":"req-001","params":{}} \ No newline at end of file diff --git a/packages/device-connect-edge/tests/fuzz/corpus/nats_creds/empty_creds.txt b/packages/device-connect-edge/tests/fuzz/corpus/nats_creds/empty_creds.txt new file mode 100644 index 0000000..ed40a0d --- /dev/null +++ b/packages/device-connect-edge/tests/fuzz/corpus/nats_creds/empty_creds.txt @@ -0,0 +1 @@ +no markers here at all diff --git a/packages/device-connect-edge/tests/fuzz/corpus/nats_creds/valid_creds.txt b/packages/device-connect-edge/tests/fuzz/corpus/nats_creds/valid_creds.txt new file mode 100644 index 0000000..c9e6474 --- /dev/null +++ b/packages/device-connect-edge/tests/fuzz/corpus/nats_creds/valid_creds.txt @@ -0,0 +1,7 @@ +-----BEGIN NATS USER JWT----- +eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJUWEg2TUxKNlc2RDVKR0RZTEhGSUFKMk5ERVVaWEhQNkFSNTdIUUpGNkpYTlRMVVBDUUFRIiwiaWF0IjoxNjA5NDU5MjAwfQ.test +------END NATS USER JWT------ + +-----BEGIN USER NKEY SEED----- +SUAM2B7EACOOJNBGQWZLOEYHZWBMP6R2YA6SKHFUHLQHKP5QGPBVQNMSQ +------END USER NKEY SEED------ diff --git a/packages/device-connect-edge/tests/fuzz/corpus/pydantic_models/capabilities.json b/packages/device-connect-edge/tests/fuzz/corpus/pydantic_models/capabilities.json new file mode 100644 index 0000000..a846738 --- /dev/null +++ b/packages/device-connect-edge/tests/fuzz/corpus/pydantic_models/capabilities.json @@ -0,0 +1 @@ +{"description":"Test device","functions":[{"name":"getStatus","description":"Get status","parameters":{"type":"object","properties":{},"required":[]},"tags":[]}],"events":[{"name":"event/alert","description":"Alert event","tags":["alert"]}]} \ No newline at end of file diff --git a/packages/device-connect-edge/tests/fuzz/corpus/pydantic_models/device_identity.json b/packages/device-connect-edge/tests/fuzz/corpus/pydantic_models/device_identity.json new file mode 100644 index 0000000..8414017 --- /dev/null +++ b/packages/device-connect-edge/tests/fuzz/corpus/pydantic_models/device_identity.json @@ -0,0 +1 @@ +{"device_type":"camera","manufacturer":"Acme","model":"CamX1","serial_number":"SN-001","firmware_version":"1.0.0","arch":"arm64","description":"Test camera"} \ No newline at end of file diff --git a/packages/device-connect-edge/tests/fuzz/corpus/pydantic_models/device_status.json b/packages/device-connect-edge/tests/fuzz/corpus/pydantic_models/device_status.json new file mode 100644 index 0000000..5b5150d --- /dev/null +++ b/packages/device-connect-edge/tests/fuzz/corpus/pydantic_models/device_status.json @@ -0,0 +1 @@ +{"location":"warehouse-A","availability":"busy","busy_score":0.7,"battery":85,"online":true} \ No newline at end of file diff --git a/packages/device-connect-edge/tests/fuzz/fuzz_credentials_json.py b/packages/device-connect-edge/tests/fuzz/fuzz_credentials_json.py new file mode 100644 index 0000000..4868668 --- /dev/null +++ b/packages/device-connect-edge/tests/fuzz/fuzz_credentials_json.py @@ -0,0 +1,44 @@ +"""Fuzz target: JSON credentials file loader. + +Exercises MessagingConfig._load_credentials_file() which parses JSON +credential files and falls back to NATS .creds format on JSONDecodeError. + +Run: + python tests/fuzz/fuzz_credentials_json.py fuzz/corpus/credentials_json/ -max_total_time=300 +""" + +import sys +import os +import tempfile + +import atheris + +with atheris.instrument_imports(): + from device_connect_edge.messaging.config import MessagingConfig + + +def TestOneInput(data: bytes) -> None: + """Write fuzzed data to a temp file and load it as a credentials file.""" + fd, path = tempfile.mkstemp(suffix=".json") + try: + with os.fdopen(fd, "wb") as f: + f.write(data) + result = MessagingConfig._load_credentials_file(path) + # Should always return a dict + assert isinstance(result, dict) + except (OSError, IOError, UnicodeDecodeError): + pass + finally: + try: + os.unlink(path) + except OSError: + pass + + +def main(): + atheris.Setup(sys.argv, TestOneInput) + atheris.Fuzz() + + +if __name__ == "__main__": + main() diff --git a/packages/device-connect-edge/tests/fuzz/fuzz_jsonrpc_cmd.py b/packages/device-connect-edge/tests/fuzz/fuzz_jsonrpc_cmd.py new file mode 100644 index 0000000..e9d860d --- /dev/null +++ b/packages/device-connect-edge/tests/fuzz/fuzz_jsonrpc_cmd.py @@ -0,0 +1,63 @@ +"""Fuzz target: JSON-RPC command message parsing. + +Exercises the same parsing path as DeviceRuntime._cmd_subscription().on_msg(), +which receives raw bytes from the messaging broker and extracts method, id, +params, and _dc_meta fields. + +Run: + python tests/fuzz/fuzz_jsonrpc_cmd.py fuzz/corpus/jsonrpc_cmd/ -max_total_time=300 +""" + +import sys +import json + +import atheris + +with atheris.instrument_imports(): + from device_connect_edge.telemetry.propagation import extract_from_meta + + +def TestOneInput(data: bytes) -> None: + """Simulate JSON-RPC command parsing from device.py:1037-1102.""" + try: + payload = json.loads(data) + except (json.JSONDecodeError, UnicodeDecodeError): + return + + if not isinstance(payload, dict): + return + + # Mirror the parsing in _cmd_subscription + method = payload.get("method") + + if "id" not in payload: + return + + msg_id = payload["id"] + params_dict = payload.get("params", {}) + + if not isinstance(params_dict, dict): + return + + # Extract _dc_meta (trace metadata) + dc_meta = params_dict.pop("_dc_meta", {}) + if isinstance(dc_meta, dict): + dc_meta.get("source_device") + # Exercise OTel context extraction + extract_from_meta(dc_meta) + + # Exercise response building + from device_connect_edge.device import build_rpc_response, build_rpc_error + + if method and isinstance(method, str): + build_rpc_response(str(msg_id), {"status": "ok"}) + build_rpc_error(str(msg_id), -32601, f"Unknown method: {method}") + + +def main(): + atheris.Setup(sys.argv, TestOneInput) + atheris.Fuzz() + + +if __name__ == "__main__": + main() diff --git a/packages/device-connect-edge/tests/fuzz/fuzz_nats_creds.py b/packages/device-connect-edge/tests/fuzz/fuzz_nats_creds.py new file mode 100644 index 0000000..aa721a5 --- /dev/null +++ b/packages/device-connect-edge/tests/fuzz/fuzz_nats_creds.py @@ -0,0 +1,51 @@ +"""Fuzz target: NATS .creds file parser. + +Exercises MessagingConfig._parse_nats_creds_file() which uses manual +string parsing with .find() and slicing to extract JWT and NKey seed +from NATS credential files. + +Run: + python tests/fuzz/fuzz_nats_creds.py fuzz/corpus/nats_creds/ -max_total_time=300 +""" + +import sys +import os +import tempfile + +import atheris + +with atheris.instrument_imports(): + from device_connect_edge.messaging.config import MessagingConfig + + +def TestOneInput(data: bytes) -> None: + """Write fuzzed data to a temp file and parse it as a .creds file.""" + try: + content = data.decode("utf-8", errors="replace") + except Exception: + return + + # _parse_nats_creds_file reads from a file path, so write to temp file + fd, path = tempfile.mkstemp(suffix=".creds") + try: + with os.fdopen(fd, "w") as f: + f.write(content) + result = MessagingConfig._parse_nats_creds_file(path) + # Verify the result is always a dict + assert isinstance(result, dict) + except (OSError, IOError): + pass + finally: + try: + os.unlink(path) + except OSError: + pass + + +def main(): + atheris.Setup(sys.argv, TestOneInput) + atheris.Fuzz() + + +if __name__ == "__main__": + main() diff --git a/packages/device-connect-edge/tests/fuzz/fuzz_pydantic_models.py b/packages/device-connect-edge/tests/fuzz/fuzz_pydantic_models.py new file mode 100644 index 0000000..f3f24d4 --- /dev/null +++ b/packages/device-connect-edge/tests/fuzz/fuzz_pydantic_models.py @@ -0,0 +1,58 @@ +"""Fuzz target: Pydantic model validation. + +Exercises all core Pydantic models (DeviceIdentity, DeviceStatus, +DeviceCapabilities, FunctionDef, EventDef) with arbitrary JSON input. +Looks for unexpected exceptions during model_validate(). + +Run: + python tests/fuzz/fuzz_pydantic_models.py fuzz/corpus/pydantic_models/ -max_total_time=300 +""" + +import sys +import json + +import atheris + +with atheris.instrument_imports(): + from pydantic import ValidationError + from device_connect_edge.types import ( + DeviceCapabilities, + DeviceIdentity, + DeviceStatus, + EventDef, + FunctionDef, + ) + +MODELS = [FunctionDef, EventDef, DeviceCapabilities, DeviceIdentity, DeviceStatus] + + +def TestOneInput(data: bytes) -> None: + """Feed fuzzed JSON into each Pydantic model.""" + if len(data) < 2: + return + + # Use first byte to select model, rest as JSON input + model_idx = data[0] % len(MODELS) + remaining = data[1:] + + try: + parsed = json.loads(remaining) + except (json.JSONDecodeError, UnicodeDecodeError): + return + + if not isinstance(parsed, dict): + return + + try: + MODELS[model_idx].model_validate(parsed) + except (ValidationError, TypeError, ValueError): + pass # Expected — Pydantic rejecting bad input + + +def main(): + atheris.Setup(sys.argv, TestOneInput) + atheris.Fuzz() + + +if __name__ == "__main__": + main() diff --git a/packages/device-connect-edge/tests/fuzz/report_hypothesis.py b/packages/device-connect-edge/tests/fuzz/report_hypothesis.py new file mode 100644 index 0000000..d9c27cc --- /dev/null +++ b/packages/device-connect-edge/tests/fuzz/report_hypothesis.py @@ -0,0 +1,89 @@ +"""Parse JUnit XML from hypothesis fuzz tests and output a GitHub-flavored markdown summary. + +Usage: + python tests/fuzz/report_hypothesis.py fuzz-hypothesis-results.xml +""" + +import sys +import xml.etree.ElementTree as ET +from pathlib import Path + + +def main(): + if len(sys.argv) < 2: + print("Usage: python report_hypothesis.py ", file=sys.stderr) + sys.exit(1) + + xml_path = Path(sys.argv[1]) + if not xml_path.exists(): + print("## Hypothesis Fuzz Tests\n\n> No results file found.", file=sys.stderr) + sys.exit(1) + + tree = ET.parse(xml_path) + root = tree.getroot() + + # Collect stats + total = 0 + passed = 0 + failed = 0 + errors = 0 + failures = [] + + for suite in root.iter("testsuite"): + for case in suite.iter("testcase"): + total += 1 + failure_el = case.find("failure") + error_el = case.find("error") + if failure_el is not None: + failed += 1 + failures.append({ + "name": case.get("name", "unknown"), + "classname": case.get("classname", ""), + "message": failure_el.get("message", ""), + "text": (failure_el.text or "").strip(), + }) + elif error_el is not None: + errors += 1 + failures.append({ + "name": case.get("name", "unknown"), + "classname": case.get("classname", ""), + "message": error_el.get("message", ""), + "text": (error_el.text or "").strip(), + }) + else: + passed += 1 + + # Output markdown + status = "pass" if failed == 0 and errors == 0 else "fail" + icon = "\u2705" if status == "pass" else "\u274c" + + print(f"## {icon} Hypothesis Fuzz Tests\n") + print("| Metric | Count |") + print("|--------|-------|") + print(f"| Total tests | {total} |") + print(f"| Passed | {passed} |") + print(f"| Failed | {failed} |") + print(f"| Errors | {errors} |") + print() + + if failures: + print("### Findings\n") + for i, f in enumerate(failures, 1): + print(f"#### {i}. `{f['name']}`\n") + if f["classname"]: + print(f"**File**: `{f['classname']}`\n") + if f["message"]: + print(f"**Error**: `{f['message']}`\n") + if f["text"]: + # Truncate very long tracebacks + text = f["text"] + lines = text.split("\n") + if len(lines) > 30: + text = "\n".join(lines[:25] + ["...", f"({len(lines) - 25} more lines)"]) + print(f"
Traceback\n\n```\n{text}\n```\n\n
\n") + else: + print("> No findings — all fuzz tests passed.\n") + + +if __name__ == "__main__": + main() diff --git a/packages/device-connect-edge/tests/fuzz/run_atheris.py b/packages/device-connect-edge/tests/fuzz/run_atheris.py new file mode 100644 index 0000000..a9d907b --- /dev/null +++ b/packages/device-connect-edge/tests/fuzz/run_atheris.py @@ -0,0 +1,227 @@ +"""Runner for all atheris fuzz targets with markdown report generation. + +Runs each fuzz target across all packages, captures results, and writes +a single combined summary report to atheris-report.md. + +Usage: + python packages/device-connect-edge/tests/fuzz/run_atheris.py --iterations=50000 +""" + +import argparse +import glob +import shutil +import subprocess +import sys +import tempfile +import time +from pathlib import Path + +FUZZ_DIR = Path(__file__).parent +# Resolve to repo root (4 levels up from tests/fuzz/ inside a package) +REPO_ROOT = FUZZ_DIR.parent.parent.parent.parent + +TARGETS = [ + # ── device-connect-edge ── + { + "name": "Edge: JSON-RPC Commands", + "package": "packages/device-connect-edge", + "script": "tests/fuzz/fuzz_jsonrpc_cmd.py", + "corpus": "tests/fuzz/corpus/jsonrpc_cmd/", + }, + { + "name": "Edge: NATS Credentials", + "package": "packages/device-connect-edge", + "script": "tests/fuzz/fuzz_nats_creds.py", + "corpus": "tests/fuzz/corpus/nats_creds/", + }, + { + "name": "Edge: Pydantic Models", + "package": "packages/device-connect-edge", + "script": "tests/fuzz/fuzz_pydantic_models.py", + "corpus": "tests/fuzz/corpus/pydantic_models/", + }, + { + "name": "Edge: Credentials JSON", + "package": "packages/device-connect-edge", + "script": "tests/fuzz/fuzz_credentials_json.py", + "corpus": "tests/fuzz/corpus/credentials_json/", + }, + # ── device-connect-server ── + { + "name": "Server: Credentials Loader", + "package": "packages/device-connect-server", + "script": "tests/fuzz/fuzz_credentials.py", + "corpus": "tests/fuzz/corpus/credentials_json/", + }, + { + "name": "Server: PIN Parsing", + "package": "packages/device-connect-server", + "script": "tests/fuzz/fuzz_commissioning.py", + "corpus": "tests/fuzz/corpus/commissioning/", + }, + # ── device-connect-agent-tools ── + { + "name": "Agent: Tool Name Parsing", + "package": "packages/device-connect-agent-tools", + "script": "tests/fuzz/fuzz_schema.py", + "corpus": "tests/fuzz/corpus/tool_names/", + }, + { + "name": "Agent: JSON-RPC Parsing", + "package": "packages/device-connect-agent-tools", + "script": "tests/fuzz/fuzz_jsonrpc_parsing.py", + "corpus": "tests/fuzz/corpus/jsonrpc_messages/", + }, +] + + +def run_target(target, iterations): + """Run a single fuzz target. Returns dict with results. + + Uses a temporary directory for the live corpus so atheris doesn't + write auto-generated entries into the seed corpus directory. + Seeds are copied in, and the temp dir is cleaned up after the run. + """ + pkg_dir = REPO_ROOT / target["package"] + seed_dir = pkg_dir / target["corpus"] + tmp_corpus = tempfile.mkdtemp(prefix=f"fuzz-corpus-{target['script'].split('/')[-1]}-") + + # Copy seed files into the temp corpus + for f in seed_dir.iterdir(): + if f.is_file(): + shutil.copy2(f, tmp_corpus) + + cmd = [ + sys.executable, + str(pkg_dir / target["script"]), + tmp_corpus, + f"-atheris_runs={iterations}", + ] + + # Snapshot existing crash files before this target + pre_crashes = set(glob.glob(str(REPO_ROOT / "crash-*"))) + + start = time.time() + result = subprocess.run( + cmd, + capture_output=True, + text=True, + cwd=str(REPO_ROOT), + ) + elapsed = time.time() - start + + # Clean up temp corpus + shutil.rmtree(tmp_corpus, ignore_errors=True) + + # Only attribute NEW crash files to this target + post_crashes = set(glob.glob(str(REPO_ROOT / "crash-*"))) + crashes = sorted(post_crashes - pre_crashes) + + return { + "name": target["name"], + "script": target["script"], + "returncode": result.returncode, + "elapsed": elapsed, + "iterations": iterations, + "stdout": result.stdout, + "stderr": result.stderr, + "crashes": crashes, + } + + +def extract_crash_info(stdout, stderr): + """Extract crash details from atheris output. + + The Python traceback goes to stdout, while libFuzzer summary + and artifact paths go to stderr. + """ + combined = stdout + "\n" + stderr + lines = combined.strip().split("\n") + crash_lines = [] + capture = False + for line in lines: + if "Uncaught Python exception" in line: + capture = True + if capture: + # Skip noisy instrumentation/libfuzzer info/stats lines + if line.startswith(("INFO:", "WARNING:", "#")): + continue + crash_lines.append(line) + if line.startswith("artifact_prefix"): + break + return "\n".join(crash_lines) if crash_lines else None + + +def generate_report(results, report_path): + """Generate markdown report from all target results.""" + total_crashes = sum(len(r["crashes"]) for r in results) + icon = "\u2705" if total_crashes == 0 else "\u274c" + + lines = [] + lines.append(f"## {icon} Atheris Fuzz Tests\n") + lines.append("| Target | Iterations | Duration | Result |") + lines.append("|--------|-----------|----------|--------|") + + for r in results: + status = "\u274c Crash" if r["returncode"] != 0 else "\u2705 Clean" + duration = f"{r['elapsed']:.1f}s" + lines.append(f"| {r['name']} | {r['iterations']:,} | {duration} | {status} |") + + lines.append("") + + # Detail any crashes + findings = [r for r in results if r["returncode"] != 0] + if findings: + lines.append("### Findings\n") + for i, r in enumerate(findings, 1): + lines.append(f"#### {i}. `{r['script']}`\n") + + crash_info = extract_crash_info(r["stdout"], r["stderr"]) + if crash_info: + lines.append("
Crash details\n") + lines.append(f"```\n{crash_info}\n```\n") + lines.append("
\n") + + if r["crashes"]: + crash_files = ", ".join(f"`{Path(c).name}`" for c in r["crashes"]) + lines.append(f"**Crash artifacts**: {crash_files}\n") + else: + lines.append("> No findings — all fuzz targets completed cleanly.\n") + + report = "\n".join(lines) + report_path.write_text(report) + return report + + +def main(): + parser = argparse.ArgumentParser(description="Run atheris fuzz targets") + parser.add_argument("--iterations", type=int, default=50000, help="Iterations per target") + args = parser.parse_args() + + # Clean old crash files + for f in glob.glob(str(REPO_ROOT / "crash-*")): + Path(f).unlink() + + results = [] + exit_code = 0 + + for target in TARGETS: + print(f"Running {target['name']}...", flush=True) + result = run_target(target, args.iterations) + results.append(result) + + if result["returncode"] != 0: + exit_code = 1 + print(f" CRASH found in {target['name']}", flush=True) + else: + print(f" Clean ({result['elapsed']:.1f}s)", flush=True) + + report_path = REPO_ROOT / "atheris-report.md" + generate_report(results, report_path) + print(f"\nReport written to {report_path}") + + sys.exit(exit_code) + + +if __name__ == "__main__": + main() diff --git a/packages/device-connect-edge/tests/fuzz/test_fuzz_credentials_json.py b/packages/device-connect-edge/tests/fuzz/test_fuzz_credentials_json.py new file mode 100644 index 0000000..b9f9dea --- /dev/null +++ b/packages/device-connect-edge/tests/fuzz/test_fuzz_credentials_json.py @@ -0,0 +1,90 @@ +"""Property-based fuzz tests for JSON credentials file loader. + +Run: + pytest tests/fuzz/test_fuzz_credentials_json.py -v +""" + +import json +import os +import tempfile + +from hypothesis import given, settings, HealthCheck +from hypothesis import strategies as st + +from device_connect_edge.messaging.config import MessagingConfig + + +json_primitives = st.one_of( + st.none(), + st.booleans(), + st.integers(min_value=-(2**53), max_value=2**53), + st.floats(allow_nan=False, allow_infinity=False), + st.text(max_size=100), +) + +json_values = st.recursive( + json_primitives, + lambda children: st.one_of( + st.lists(children, max_size=3), + st.dictionaries(st.text(max_size=20), children, max_size=3), + ), + max_leaves=10, +) + + +@given(data=st.dictionaries(st.text(max_size=30), json_values, max_size=10)) +@settings(max_examples=3000, suppress_health_check=[HealthCheck.too_slow]) +def test_load_credentials_json_never_crashes(data): + """_load_credentials_file with valid JSON must always return a dict.""" + fd, path = tempfile.mkstemp(suffix=".json") + try: + with os.fdopen(fd, "w") as f: + json.dump(data, f) + result = MessagingConfig._load_credentials_file(path) + assert isinstance(result, dict) + finally: + try: + os.unlink(path) + except OSError: + pass + + +@given(data=st.binary(max_size=2000)) +@settings(max_examples=3000, suppress_health_check=[HealthCheck.too_slow]) +def test_load_credentials_raw_bytes_never_crashes(data): + """_load_credentials_file with arbitrary bytes must not crash.""" + fd, path = tempfile.mkstemp(suffix=".json") + try: + with os.fdopen(fd, "wb") as f: + f.write(data) + result = MessagingConfig._load_credentials_file(path) + assert isinstance(result, dict) + except UnicodeDecodeError: + pass # Acceptable when file contains non-UTF-8 bytes + finally: + try: + os.unlink(path) + except OSError: + pass + + +@given( + jwt=st.text(max_size=200), + nkey_seed=st.text(max_size=200), +) +@settings(max_examples=2000) +def test_load_credentials_nested_nats_format(jwt, nkey_seed): + """Nested NATS format should extract jwt and nkey_seed correctly.""" + data = {"nats": {"jwt": jwt, "nkey_seed": nkey_seed}} + fd, path = tempfile.mkstemp(suffix=".json") + try: + with os.fdopen(fd, "w") as f: + json.dump(data, f) + result = MessagingConfig._load_credentials_file(path) + assert result["jwt"] == jwt + assert result["nkey_seed"] == nkey_seed + finally: + try: + os.unlink(path) + except OSError: + pass diff --git a/packages/device-connect-edge/tests/fuzz/test_fuzz_jsonrpc_cmd.py b/packages/device-connect-edge/tests/fuzz/test_fuzz_jsonrpc_cmd.py new file mode 100644 index 0000000..aace384 --- /dev/null +++ b/packages/device-connect-edge/tests/fuzz/test_fuzz_jsonrpc_cmd.py @@ -0,0 +1,104 @@ +"""Property-based fuzz tests for JSON-RPC command parsing. + +Uses hypothesis for portable, pytest-integrated fuzzing. +These tests exercise the same code paths as fuzz_jsonrpc_cmd.py (atheris) +but run on any platform without needing libFuzzer. + +Run: + pytest tests/fuzz/test_fuzz_jsonrpc_cmd.py -v + pytest tests/fuzz/test_fuzz_jsonrpc_cmd.py -v --hypothesis-seed=0 # reproducible +""" + +import json + +from hypothesis import given, settings, HealthCheck +from hypothesis import strategies as st + +from device_connect_edge.device import build_rpc_response, build_rpc_error +from device_connect_edge.telemetry.propagation import extract_from_meta + + +# --- Strategies --- + +json_primitives = st.one_of( + st.none(), + st.booleans(), + st.integers(min_value=-(2**53), max_value=2**53), + st.floats(allow_nan=False, allow_infinity=False), + st.text(max_size=200), +) + +json_values = st.recursive( + json_primitives, + lambda children: st.one_of( + st.lists(children, max_size=5), + st.dictionaries(st.text(max_size=20), children, max_size=5), + ), + max_leaves=20, +) + +jsonrpc_like = st.fixed_dictionaries( + {}, + optional={ + "jsonrpc": st.sampled_from(["2.0", "1.0", "", None, 2]), + "method": st.one_of(st.text(max_size=100), st.none(), st.integers()), + "id": st.one_of(st.text(max_size=50), st.integers(), st.none()), + "params": json_values, + }, +) + + +# --- Tests --- + +@given(data=st.binary(max_size=4096)) +@settings(max_examples=5000, suppress_health_check=[HealthCheck.too_slow]) +def test_jsonrpc_raw_bytes_never_crashes(data): + """Raw bytes must never cause an unhandled exception in the parsing path.""" + try: + payload = json.loads(data) + except (json.JSONDecodeError, UnicodeDecodeError): + return + + if not isinstance(payload, dict): + return + + payload.get("method") + if "id" not in payload: + return + + params_dict = payload.get("params", {}) + if not isinstance(params_dict, dict): + return + + dc_meta = params_dict.pop("_dc_meta", {}) + if isinstance(dc_meta, dict): + extract_from_meta(dc_meta) + + +@given(msg=jsonrpc_like) +@settings(max_examples=5000, suppress_health_check=[HealthCheck.too_slow]) +def test_jsonrpc_structured_never_crashes(msg): + """Structured JSON-RPC-like dicts must never crash the parsing path.""" + method = msg.get("method") + if "id" not in msg: + return + + msg_id = msg["id"] + params_dict = msg.get("params", {}) + if not isinstance(params_dict, dict): + return + + dc_meta = params_dict.pop("_dc_meta", {}) + if isinstance(dc_meta, dict): + extract_from_meta(dc_meta) + + if method and isinstance(method, str) and msg_id is not None: + build_rpc_response(str(msg_id), {"status": "ok"}) + build_rpc_error(str(msg_id), -32601, f"Unknown method: {method}") + + +@given(meta=st.dictionaries(st.text(max_size=50), st.text(max_size=200), max_size=10)) +@settings(max_examples=2000) +def test_extract_from_meta_never_crashes(meta): + """extract_from_meta must handle arbitrary dict input.""" + extract_from_meta(meta) diff --git a/packages/device-connect-edge/tests/fuzz/test_fuzz_nats_creds.py b/packages/device-connect-edge/tests/fuzz/test_fuzz_nats_creds.py new file mode 100644 index 0000000..95b1d2c --- /dev/null +++ b/packages/device-connect-edge/tests/fuzz/test_fuzz_nats_creds.py @@ -0,0 +1,87 @@ +"""Property-based fuzz tests for NATS .creds file parser. + +Run: + pytest tests/fuzz/test_fuzz_nats_creds.py -v +""" + +import os +import tempfile + +from hypothesis import given, settings, HealthCheck +from hypothesis import strategies as st + +from device_connect_edge.messaging.config import MessagingConfig + + +# Strategies that include the real markers to exercise extraction logic +nats_markers = st.sampled_from([ + "-----BEGIN NATS USER JWT-----", + "------END NATS USER JWT------", + "-----BEGIN USER NKEY SEED-----", + "------END USER NKEY SEED------", + "", +]) + +creds_content = st.one_of( + # Totally random text + st.text(max_size=2000), + # Random binary decoded lossily + st.binary(max_size=2000).map(lambda b: b.decode("utf-8", errors="replace")), + # Structured with real markers mixed with random content + st.tuples( + nats_markers, st.text(max_size=200), + nats_markers, st.text(max_size=200), + nats_markers, st.text(max_size=200), + nats_markers, + ).map(lambda parts: "\n".join(parts)), +) + + +@given(content=creds_content) +@settings(max_examples=3000, suppress_health_check=[HealthCheck.too_slow]) +def test_parse_nats_creds_never_crashes(content): + """_parse_nats_creds_file must always return a dict, never crash.""" + fd, path = tempfile.mkstemp(suffix=".creds") + try: + with os.fdopen(fd, "w") as f: + f.write(content) + result = MessagingConfig._parse_nats_creds_file(path) + assert isinstance(result, dict) + # If keys present, they should be strings + for key in ("jwt", "nkey_seed"): + if key in result: + assert isinstance(result[key], str) + finally: + try: + os.unlink(path) + except OSError: + pass + + +@given(content=st.text(max_size=500)) +@settings(max_examples=2000) +def test_parse_nats_creds_roundtrip(content): + """If markers are present, extracted values should be substrings of input.""" + jwt_begin = "-----BEGIN NATS USER JWT-----" + jwt_end = "------END NATS USER JWT------" + nkey_begin = "-----BEGIN USER NKEY SEED-----" + nkey_end = "------END USER NKEY SEED------" + + full = f"{jwt_begin}\n{content}\n{jwt_end}\n{nkey_begin}\n{content}\n{nkey_end}\n" + + fd, path = tempfile.mkstemp(suffix=".creds") + try: + with os.fdopen(fd, "w") as f: + f.write(full) + result = MessagingConfig._parse_nats_creds_file(path) + assert isinstance(result, dict) + # Markers are present, so jwt and nkey_seed should be extracted + assert "jwt" in result + assert "nkey_seed" in result + assert isinstance(result["jwt"], str) + assert isinstance(result["nkey_seed"], str) + finally: + try: + os.unlink(path) + except OSError: + pass diff --git a/packages/device-connect-edge/tests/fuzz/test_fuzz_pydantic_models.py b/packages/device-connect-edge/tests/fuzz/test_fuzz_pydantic_models.py new file mode 100644 index 0000000..02505e9 --- /dev/null +++ b/packages/device-connect-edge/tests/fuzz/test_fuzz_pydantic_models.py @@ -0,0 +1,98 @@ +"""Property-based fuzz tests for Pydantic model validation. + +Run: + pytest tests/fuzz/test_fuzz_pydantic_models.py -v +""" + +import json + +from hypothesis import given, settings, HealthCheck +from hypothesis import strategies as st +from pydantic import ValidationError + +from device_connect_edge.types import ( + DeviceCapabilities, + DeviceIdentity, + DeviceStatus, + EventDef, + FunctionDef, +) + +MODELS = [FunctionDef, EventDef, DeviceCapabilities, DeviceIdentity, DeviceStatus] + +# Strategy: arbitrary JSON-like dicts +json_primitives = st.one_of( + st.none(), + st.booleans(), + st.integers(min_value=-(2**53), max_value=2**53), + st.floats(allow_nan=False, allow_infinity=False), + st.text(max_size=100), +) + +json_values = st.recursive( + json_primitives, + lambda children: st.one_of( + st.lists(children, max_size=3), + st.dictionaries(st.text(max_size=20), children, max_size=5), + ), + max_leaves=15, +) + +arbitrary_dict = st.dictionaries(st.text(max_size=30), json_values, max_size=10) + + +@given(data=arbitrary_dict, model_idx=st.integers(min_value=0, max_value=len(MODELS) - 1)) +@settings(max_examples=5000, suppress_health_check=[HealthCheck.too_slow]) +def test_pydantic_models_handle_arbitrary_dicts(data, model_idx): + """Pydantic models must either validate or raise ValidationError, never crash.""" + model = MODELS[model_idx] + try: + model.model_validate(data) + except (ValidationError, TypeError, ValueError): + pass # Expected rejections + + +@given(raw=st.binary(max_size=4096)) +@settings(max_examples=3000, suppress_health_check=[HealthCheck.too_slow]) +def test_pydantic_models_handle_raw_bytes(raw): + """Parsing arbitrary bytes as JSON then validating must not crash.""" + try: + parsed = json.loads(raw) + except (json.JSONDecodeError, UnicodeDecodeError): + return + + if not isinstance(parsed, dict): + return + + for model in MODELS: + try: + model.model_validate(parsed) + except (ValidationError, TypeError, ValueError): + pass + + +# Targeted strategies for models with constraints +@given( + busy_score=st.one_of( + st.floats(allow_nan=True, allow_infinity=True), + st.integers(), + st.text(max_size=20), + st.none(), + ), + battery=st.one_of( + st.integers(min_value=-1000, max_value=1000), + st.floats(), + st.text(max_size=20), + st.none(), + ), +) +@settings(max_examples=2000) +def test_device_status_constrained_fields(busy_score, battery): + """DeviceStatus with constrained fields (busy_score 0-1, battery 0-100).""" + try: + DeviceStatus.model_validate({ + "busy_score": busy_score, + "battery": battery, + }) + except (ValidationError, TypeError, ValueError): + pass diff --git a/packages/device-connect-server/pyproject.toml b/packages/device-connect-server/pyproject.toml index c57ffe2..de1d11d 100644 --- a/packages/device-connect-server/pyproject.toml +++ b/packages/device-connect-server/pyproject.toml @@ -51,6 +51,10 @@ dev = [ "pytest-timeout>=2.0", "device-connect-server[security]", ] +fuzz = [ + "hypothesis>=6.0", + "coverage>=7.0", +] all = [ "device-connect-server[security,telemetry,state,logging,mqtt,dev]", ] diff --git a/packages/device-connect-server/tests/fuzz/README.md b/packages/device-connect-server/tests/fuzz/README.md new file mode 100644 index 0000000..08f243e --- /dev/null +++ b/packages/device-connect-server/tests/fuzz/README.md @@ -0,0 +1,53 @@ +# Fuzz Testing for Device Connect Server + +Fuzz tests for the server package using [Hypothesis](https://hypothesis.readthedocs.io/) (property-based, pytest-integrated) and [Atheris](https://github.com/google/atheris) (coverage-guided, libFuzzer-based). + +For full setup instructions (installing atheris on macOS/Linux, deep fuzzing, CI integration), see the [edge fuzz README](../../device-connect-edge/tests/fuzz/README.md). + +## Fuzz Targets + +| Target | Hypothesis | Atheris | What it tests | +|--------|-----------|---------|---------------| +| Credentials Loader | `test_fuzz_credentials.py` | `fuzz_credentials.py` | `CredentialsLoader._parse_json_format()` and `_parse_nats_creds_format()` — JSON and regex-based credential parsing | +| PIN Parsing | `test_fuzz_commissioning.py` | `fuzz_commissioning.py` | `parse_pin()` and `format_pin()` — PIN string manipulation and roundtrip | + +## Running Locally + +### Hypothesis (pytest) + +```bash +cd packages/device-connect-server +pip install -e ".[dev,fuzz]" + +# Run all fuzz tests — findings shown in terminal output +pytest tests/fuzz/test_fuzz_*.py -v + +# More examples for deeper coverage +HYPOTHESIS_PROFILE=ci pytest tests/fuzz/test_fuzz_*.py -v +``` + +### Atheris + +```bash +pip install atheris # Linux: works directly. macOS: see edge README for LLVM setup. + +# Run individual targets directly +cd packages/device-connect-server +python tests/fuzz/fuzz_credentials.py tests/fuzz/corpus/credentials_json/ -max_total_time=300 +python tests/fuzz/fuzz_commissioning.py tests/fuzz/corpus/commissioning/ -max_total_time=300 + +# Deep fuzzing (1 hour per target, run indefinitely with no flags) +python tests/fuzz/fuzz_credentials.py tests/fuzz/corpus/credentials_json/ -max_total_time=3600 + +# Run all targets across ALL packages (from repo root) — writes atheris-report.md +python packages/device-connect-edge/tests/fuzz/run_atheris.py --iterations=50000 +``` + +### Where to find results + +| Tool | Where | +|------|-------| +| Hypothesis | Terminal output from pytest | +| Atheris (unified runner) | `atheris-report.md` at repo root | +| Atheris (direct) | `crash-` files in current directory | +| CI | GitHub Actions job summary (scroll down on run page) | diff --git a/packages/device-connect-server/tests/fuzz/conftest.py b/packages/device-connect-server/tests/fuzz/conftest.py new file mode 100644 index 0000000..a95a25b --- /dev/null +++ b/packages/device-connect-server/tests/fuzz/conftest.py @@ -0,0 +1,19 @@ +"""Hypothesis configuration for server fuzz tests.""" + +import os + +from hypothesis import settings, HealthCheck + +settings.register_profile( + "default", + max_examples=5000, + suppress_health_check=[HealthCheck.too_slow], +) + +settings.register_profile( + "ci", + max_examples=20000, + suppress_health_check=[HealthCheck.too_slow], +) + +settings.load_profile(os.getenv("HYPOTHESIS_PROFILE", "default")) diff --git a/packages/device-connect-server/tests/fuzz/corpus/commissioning/valid_pin.txt b/packages/device-connect-server/tests/fuzz/corpus/commissioning/valid_pin.txt new file mode 100644 index 0000000..18ab88b --- /dev/null +++ b/packages/device-connect-server/tests/fuzz/corpus/commissioning/valid_pin.txt @@ -0,0 +1 @@ +1234-5678 \ No newline at end of file diff --git a/packages/device-connect-server/tests/fuzz/corpus/credentials_json/valid_full.json b/packages/device-connect-server/tests/fuzz/corpus/credentials_json/valid_full.json new file mode 100644 index 0000000..ba50dde --- /dev/null +++ b/packages/device-connect-server/tests/fuzz/corpus/credentials_json/valid_full.json @@ -0,0 +1 @@ +{"device_id":"cam-001","tenant":"default","nats":{"urls":["tls://nats:4222"],"jwt":"eyJ0eXAiOiJKV1Qi.test","nkey_seed":"SUAM2B7EA","tls":{"ca_file":"/certs/ca.pem"}}} \ No newline at end of file diff --git a/packages/device-connect-server/tests/fuzz/corpus/credentials_json/valid_mqtt.json b/packages/device-connect-server/tests/fuzz/corpus/credentials_json/valid_mqtt.json new file mode 100644 index 0000000..3d3f663 --- /dev/null +++ b/packages/device-connect-server/tests/fuzz/corpus/credentials_json/valid_mqtt.json @@ -0,0 +1 @@ +{"mqtt":{"username":"device01","password":"s3cret"}} \ No newline at end of file diff --git a/packages/device-connect-server/tests/fuzz/corpus/nats_creds/valid.txt b/packages/device-connect-server/tests/fuzz/corpus/nats_creds/valid.txt new file mode 100644 index 0000000..01c17ee --- /dev/null +++ b/packages/device-connect-server/tests/fuzz/corpus/nats_creds/valid.txt @@ -0,0 +1,7 @@ +-----BEGIN NATS USER JWT----- +eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5In0.test +------END NATS USER JWT------ + +-----BEGIN USER NKEY SEED----- +SUAM2B7EACOOJNBGQWZLOEYHZWBMP6R2YA6SKHFUHLQHKP5QGPBVQNMSQ +------END USER NKEY SEED------ diff --git a/packages/device-connect-server/tests/fuzz/fuzz_commissioning.py b/packages/device-connect-server/tests/fuzz/fuzz_commissioning.py new file mode 100644 index 0000000..608d5ba --- /dev/null +++ b/packages/device-connect-server/tests/fuzz/fuzz_commissioning.py @@ -0,0 +1,31 @@ +"""Atheris fuzz target: PIN parsing. + +Run: + python tests/fuzz/fuzz_commissioning.py fuzz/corpus/commissioning/ -max_total_time=300 +""" + +import sys + +import atheris + +with atheris.instrument_imports(): + from device_connect_server.security.commissioning import parse_pin + + +def TestOneInput(data: bytes) -> None: + try: + text = data.decode("utf-8", errors="replace") + except Exception: + return + + result = parse_pin(text) + assert isinstance(result, str) + + +def main(): + atheris.Setup(sys.argv, TestOneInput) + atheris.Fuzz() + + +if __name__ == "__main__": + main() diff --git a/packages/device-connect-server/tests/fuzz/fuzz_credentials.py b/packages/device-connect-server/tests/fuzz/fuzz_credentials.py new file mode 100644 index 0000000..ec33bdc --- /dev/null +++ b/packages/device-connect-server/tests/fuzz/fuzz_credentials.py @@ -0,0 +1,40 @@ +"""Atheris fuzz target: server CredentialsLoader. + +Run: + python tests/fuzz/fuzz_credentials.py fuzz/corpus/credentials_json/ -max_total_time=300 +""" + +import sys +import json + +import atheris + +with atheris.instrument_imports(): + from device_connect_server.security.credentials import CredentialsLoader + + +def TestOneInput(data: bytes) -> None: + try: + text = data.decode("utf-8", errors="replace") + except Exception: + return + + # Try JSON parsing path + if text.strip().startswith("{"): + try: + CredentialsLoader._parse_json_format(text, "") + except (ValueError, TypeError, KeyError, AttributeError, + json.JSONDecodeError): + pass + + # Try NATS creds parsing path + CredentialsLoader._parse_nats_creds_format(text) + + +def main(): + atheris.Setup(sys.argv, TestOneInput) + atheris.Fuzz() + + +if __name__ == "__main__": + main() diff --git a/packages/device-connect-server/tests/fuzz/test_fuzz_commissioning.py b/packages/device-connect-server/tests/fuzz/test_fuzz_commissioning.py new file mode 100644 index 0000000..adc6669 --- /dev/null +++ b/packages/device-connect-server/tests/fuzz/test_fuzz_commissioning.py @@ -0,0 +1,30 @@ +"""Property-based fuzz tests for PIN parsing. + +Run: + pytest tests/fuzz/test_fuzz_commissioning.py -v +""" + +from hypothesis import given, settings +from hypothesis import strategies as st + +from device_connect_server.security.commissioning import parse_pin, format_pin + + +@given(pin=st.text(max_size=200)) +@settings(max_examples=5000) +def test_parse_pin_never_crashes(pin): + """parse_pin must handle arbitrary strings without crashing.""" + result = parse_pin(pin) + assert isinstance(result, str) + # Should strip dashes and spaces + assert "-" not in result + assert " " not in result + + +@given(pin=st.from_regex(r"[0-9]{8}", fullmatch=True)) +@settings(max_examples=2000) +def test_format_then_parse_roundtrip(pin): + """format_pin -> parse_pin should roundtrip for valid 8-digit PINs.""" + formatted = format_pin(pin) + parsed = parse_pin(formatted) + assert parsed == pin diff --git a/packages/device-connect-server/tests/fuzz/test_fuzz_credentials.py b/packages/device-connect-server/tests/fuzz/test_fuzz_credentials.py new file mode 100644 index 0000000..5dbfd7b --- /dev/null +++ b/packages/device-connect-server/tests/fuzz/test_fuzz_credentials.py @@ -0,0 +1,61 @@ +"""Property-based fuzz tests for server CredentialsLoader. + +Run: + pytest tests/fuzz/test_fuzz_credentials.py -v +""" + +import json + +from hypothesis import given, settings, HealthCheck +from hypothesis import strategies as st + +from device_connect_server.security.credentials import CredentialsLoader + + +json_primitives = st.one_of( + st.none(), + st.booleans(), + st.integers(min_value=-(2**53), max_value=2**53), + st.floats(allow_nan=False, allow_infinity=False), + st.text(max_size=100), +) + +json_values = st.recursive( + json_primitives, + lambda children: st.one_of( + st.lists(children, max_size=3), + st.dictionaries(st.text(max_size=20), children, max_size=5), + ), + max_leaves=15, +) + + +@given(data=st.dictionaries(st.text(max_size=30), json_values, max_size=10)) +@settings(max_examples=5000, suppress_health_check=[HealthCheck.too_slow]) +def test_parse_json_format_never_crashes(data): + """_parse_json_format must not crash on arbitrary dicts.""" + content = json.dumps(data) + try: + CredentialsLoader._parse_json_format(content, "") + except (ValueError, TypeError, KeyError, AttributeError): + pass + + +@given(content=st.text(max_size=2000)) +@settings(max_examples=5000, suppress_health_check=[HealthCheck.too_slow]) +def test_parse_nats_creds_format_never_crashes(content): + """_parse_nats_creds_format must always return a dict.""" + result = CredentialsLoader._parse_nats_creds_format(content) + assert isinstance(result, dict) + for key in ("jwt", "nkey_seed"): + if key in result: + assert isinstance(result[key], str) + + +@given(content=st.binary(max_size=2000)) +@settings(max_examples=3000, suppress_health_check=[HealthCheck.too_slow]) +def test_parse_nats_creds_format_raw_bytes(content): + """_parse_nats_creds_format must handle decoded binary.""" + text = content.decode("utf-8", errors="replace") + result = CredentialsLoader._parse_nats_creds_format(text) + assert isinstance(result, dict)