Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 69 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
python -m pip install --upgrade pip
pip install --only-binary eclipse-zenoh -e "packages/device-connect-edge[dev]"
- name: Run unit tests
run: pytest packages/device-connect-edge/tests/ -v --timeout=30 --tb=short
run: pytest packages/device-connect-edge/tests/ -v --timeout=30 --tb=short --ignore=packages/device-connect-edge/tests/fuzz

unit-tests-server:
runs-on: ubuntu-latest
Expand All @@ -49,7 +49,7 @@ jobs:
pip install --only-binary eclipse-zenoh -e packages/device-connect-edge
pip install --only-binary eclipse-zenoh -e "packages/device-connect-server[all]"
- name: Run unit tests
run: pytest packages/device-connect-server/tests/ -v --timeout=30 --tb=short
run: pytest packages/device-connect-server/tests/ -v --timeout=30 --tb=short --ignore=packages/device-connect-server/tests/fuzz

unit-tests-agent-tools:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -86,6 +86,73 @@ jobs:
ruff check packages/device-connect-agent-tools/
ruff check tests/

# ── Fuzz tests (all packages, single report) ────────────────────
fuzz-tests-hypothesis:
runs-on: ubuntu-latest
timeout-minutes: 15
steps:
- uses: actions/checkout@v6
- uses: actions/setup-python@v6
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install all packages
run: |
python -m pip install --upgrade pip
pip install --only-binary eclipse-zenoh -e "packages/device-connect-edge[dev,fuzz]"
pip install --only-binary eclipse-zenoh -e "packages/device-connect-server[dev,fuzz]"
pip install --only-binary eclipse-zenoh -e "packages/device-connect-agent-tools[dev,fuzz]"
- name: Run hypothesis fuzz tests (all packages)
env:
HYPOTHESIS_PROFILE: ci
run: |
pytest \
packages/device-connect-edge/tests/fuzz/test_fuzz_*.py \
packages/device-connect-server/tests/fuzz/test_fuzz_*.py \
packages/device-connect-agent-tools/tests/fuzz/test_fuzz_*.py \
-v --timeout=120 --tb=short --junitxml=fuzz-hypothesis-results.xml
- name: Publish hypothesis findings summary
if: always()
run: python packages/device-connect-edge/tests/fuzz/report_hypothesis.py fuzz-hypothesis-results.xml >> "$GITHUB_STEP_SUMMARY"
- name: Upload hypothesis report
if: always()
uses: actions/upload-artifact@v5
with:
name: fuzz-hypothesis-results
path: fuzz-hypothesis-results.xml
retention-days: 30

fuzz-tests-atheris:
runs-on: ubuntu-latest
timeout-minutes: 20
steps:
- uses: actions/checkout@v6
- uses: actions/setup-python@v6
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install all packages
run: |
python -m pip install --upgrade pip
pip install --only-binary eclipse-zenoh -e "packages/device-connect-edge[dev,fuzz]"
pip install --only-binary eclipse-zenoh -e "packages/device-connect-server[dev,fuzz]"
pip install --only-binary eclipse-zenoh -e "packages/device-connect-agent-tools[dev,fuzz]"
pip install atheris
- name: Run atheris fuzz targets (all packages)
run: python packages/device-connect-edge/tests/fuzz/run_atheris.py --iterations=50000
- name: Publish atheris findings summary
if: always()
run: cat atheris-report.md >> "$GITHUB_STEP_SUMMARY"
- name: Upload atheris crashes and report
if: always()
uses: actions/upload-artifact@v5
with:
name: fuzz-atheris-results
path: |
crash-*
atheris-report.md
retention-days: 30

# ── Tier 1: Integration tests (no LLM) ────────────────────────
integration-tests:
runs-on: ubuntu-latest
Expand Down
10 changes: 10 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ build/
*.egg
.venv/
.vscode/
.DS_Store
.env
.pytest_cache/
.mypy_cache/

# Fuzz testing artifacts
crash-*
timeout-*
htmlcov/
.hypothesis/
atheris-report.md
fuzz-hypothesis-results.xml
**/fuzz/findings/
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,38 @@ def flatten_device(raw: Dict[str, Any]) -> Dict[str, Any]:
}


# ── Message parsing helpers (extracted for testability) ─────────────


def parse_buffered_payload(data: bytes) -> dict:
"""Parse raw bytes into a payload dict for buffered subscriptions.

Always returns a dict — falls back to ``{"raw": ...}`` on any error.
"""
try:
payload = json.loads(data.decode())
if not isinstance(payload, dict):
payload = {"raw": str(payload)[:500]}
except Exception:
payload = {"raw": data.decode("utf-8", errors="replace")[:500]}
return payload


def parse_event_payload(data: bytes) -> dict:
"""Parse raw bytes into a normalized event dict.

Returns dict with keys ``device_id``, ``event_name``, ``params``.
Raises on malformed input (caller should catch).
"""
payload = json.loads(data.decode())
if not isinstance(payload, dict):
raise ValueError("Expected JSON object")
method = payload.get("method", "")
dev_id = payload.get("params", {}).get("device_id", "unknown")
params = payload.get("params", {})
return {"device_id": dev_id, "event_name": method, "params": params}


# ── Connection class ────────────────────────────────────────────────


Expand Down Expand Up @@ -397,10 +429,7 @@ def subscribe_buffered(

async def _do_subscribe():
async def _on_msg(data: bytes, msg_subject: str, reply: str = ""):
try:
payload = json.loads(data.decode())
except Exception:
payload = {"raw": data.decode()[:500]}
payload = parse_buffered_payload(data)
# Store as (subject, data) tuple
self._inbox[name].append((msg_subject, payload))
# Trim to prevent unbounded growth
Expand Down Expand Up @@ -498,22 +527,13 @@ async def subscribe_events(

async def _on_msg(data: bytes, reply: str = ""):
try:
payload = json.loads(data.decode())
method = payload.get("method", "")
dev_id = payload.get("params", {}).get("device_id", "unknown")
event_name = method
params = payload.get("params", {})

event = parse_event_payload(data)
logger.info(
"EVENT <- %s::%s %s",
dev_id, event_name,
json.dumps(params, default=str),
event["device_id"], event["event_name"],
json.dumps(event["params"], default=str),
)
await buffer.put({
"device_id": dev_id,
"event_name": event_name,
"params": params,
})
await buffer.put(event)
except Exception as e:
logger.error("Error parsing event: %s", e)

Expand Down
7 changes: 7 additions & 0 deletions packages/device-connect-agent-tools/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,20 @@ dev = [
"pytest-asyncio>=0.23",
"pytest-timeout>=2.0",
]
fuzz = [
"hypothesis>=6.0",
"coverage>=7.0",
]

[tool.pytest.ini_options]
asyncio_mode = "strict"
markers = [
"integration: requires live Device Connect infrastructure (NATS/Zenoh + registry)",
]

[tool.setuptools.packages.find]
include = ["device_connect_agent_tools*"]

[project.urls]
Homepage = "https://github.com/arm/device-connect"
Repository = "https://github.com/arm/device-connect.git"
Expand Down
53 changes: 53 additions & 0 deletions packages/device-connect-agent-tools/tests/fuzz/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Fuzz Testing for Device Connect Agent Tools

Fuzz tests for the agent-tools package using [Hypothesis](https://hypothesis.readthedocs.io/) (property-based, pytest-integrated) and [Atheris](https://github.com/google/atheris) (coverage-guided, libFuzzer-based).

For full setup instructions (installing atheris on macOS/Linux, deep fuzzing, CI integration), see the [edge fuzz README](../../device-connect-edge/tests/fuzz/README.md).

## Fuzz Targets

| Target | Hypothesis | Atheris | What it tests |
|--------|-----------|---------|---------------|
| Tool Name Parsing | `test_fuzz_schema.py` | `fuzz_schema.py` | `parse_tool_name()` — MCP tool name splitting on `::` delimiter |
| JSON-RPC Parsing | `test_fuzz_jsonrpc_parsing.py` | `fuzz_jsonrpc_parsing.py` | Buffered message and event message parsing from `connection.py` |

## Running Locally

### Hypothesis (pytest)

```bash
cd packages/device-connect-agent-tools
pip install -e ".[dev,fuzz]"

# Run all fuzz tests — findings shown in terminal output
pytest tests/fuzz/test_fuzz_*.py -v

# More examples for deeper coverage
HYPOTHESIS_PROFILE=ci pytest tests/fuzz/test_fuzz_*.py -v
```

### Atheris

```bash
pip install atheris # Linux: works directly. macOS: see edge README for LLVM setup.

# Run individual targets directly
cd packages/device-connect-agent-tools
python tests/fuzz/fuzz_schema.py tests/fuzz/corpus/tool_names/ -max_total_time=300
python tests/fuzz/fuzz_jsonrpc_parsing.py tests/fuzz/corpus/jsonrpc_messages/ -max_total_time=300

# Deep fuzzing (1 hour per target, run indefinitely with no flags)
python tests/fuzz/fuzz_schema.py tests/fuzz/corpus/tool_names/ -max_total_time=3600

# Run all targets across ALL packages (from repo root) — writes atheris-report.md
python packages/device-connect-edge/tests/fuzz/run_atheris.py --iterations=50000
```

### Where to find results

| Tool | Where |
|------|-------|
| Hypothesis | Terminal output from pytest |
| Atheris (unified runner) | `atheris-report.md` at repo root |
| Atheris (direct) | `crash-<hash>` files in current directory |
| CI | GitHub Actions job summary (scroll down on run page) |
19 changes: 19 additions & 0 deletions packages/device-connect-agent-tools/tests/fuzz/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""Hypothesis configuration for agent-tools fuzz tests."""

import os

from hypothesis import settings, HealthCheck

settings.register_profile(
"default",
max_examples=5000,
suppress_health_check=[HealthCheck.too_slow],
)

settings.register_profile(
"ci",
max_examples=20000,
suppress_health_check=[HealthCheck.too_slow],
)

settings.load_profile(os.getenv("HYPOTHESIS_PROFILE", "default"))
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"jsonrpc":"2.0","method":"event/objectDetected","params":{"device_id":"cam-01","label":"person","confidence":0.95}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"jsonrpc":"2.0","id":"req-001","result":{"status":"ok","data":[1,2,3]}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
camera-01::captureImage
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Atheris fuzz target: JSON-RPC message parsing from connection layer.

Run:
python tests/fuzz/fuzz_jsonrpc_parsing.py tests/fuzz/corpus/jsonrpc_messages/ -max_total_time=300
"""

import sys
import json

import atheris

with atheris.instrument_imports():
from device_connect_agent_tools.connection import (
parse_buffered_payload,
parse_event_payload,
)


def TestOneInput(data: bytes) -> None:
# Always test buffered parsing (never crashes)
result = parse_buffered_payload(data)
assert isinstance(result, dict)

# Test event parsing (may fail on non-JSON)
try:
result = parse_event_payload(data)
assert isinstance(result, dict)
except (json.JSONDecodeError, UnicodeDecodeError, ValueError):
pass


def main():
atheris.Setup(sys.argv, TestOneInput)
atheris.Fuzz()


if __name__ == "__main__":
main()
35 changes: 35 additions & 0 deletions packages/device-connect-agent-tools/tests/fuzz/fuzz_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Atheris fuzz target: MCP tool name parsing.

Run:
python tests/fuzz/fuzz_schema.py fuzz/corpus/tool_names/ -max_total_time=300
"""

import sys

import atheris

with atheris.instrument_imports():
from device_connect_agent_tools.mcp.schema import parse_tool_name


def TestOneInput(data: bytes) -> None:
try:
text = data.decode("utf-8", errors="replace")
except Exception:
return

try:
result = parse_tool_name(text)
assert isinstance(result, tuple)
assert len(result) == 2
except ValueError:
pass


def main():
atheris.Setup(sys.argv, TestOneInput)
atheris.Fuzz()


if __name__ == "__main__":
main()
Loading
Loading