Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
62c93e2
Add multi-tenant deployment with broker-enforced isolation
atsyplikhin Apr 4, 2026
80c8db2
Add self-service tenant management web portal
atsyplikhin Apr 4, 2026
a3dd442
Improve portal UX: fix HTMX login redirect, show server IP, add start…
atsyplikhin Apr 4, 2026
b06d100
Fix device_id mismatch error when auto-detecting from credentials file
atsyplikhin Apr 4, 2026
bd37143
Fix dashboard data rendering, wildcard registry, expandable device de…
atsyplikhin Apr 4, 2026
e3d1406
Add RPC invocation from portal, fix param display, add emit example
atsyplikhin Apr 4, 2026
cdbe968
Add live event log streaming and relative last-seen timestamps
atsyplikhin Apr 4, 2026
a392883
Add greenhouse demo bundle, live event streaming, and UX fixes
atsyplikhin Apr 4, 2026
3ceb89d
Fix admin view-as-user: enable RPC invoke and event streaming
atsyplikhin Apr 4, 2026
0c71968
Add Zenoh as selectable messaging backend in portal bootstrap
atsyplikhin Apr 6, 2026
a7e967c
Add MQTT (Mosquitto) as third messaging backend option in portal
atsyplikhin Apr 6, 2026
606dda9
Add device connection and registration resilience
atsyplikhin Apr 7, 2026
f8c7bf6
Fix review findings: dedup, validation, encapsulation, tests
atsyplikhin Apr 7, 2026
c4a2b8d
Fix ruff lint errors: unused imports, unused variables, and import or…
atsyplikhin Apr 7, 2026
fda6f2b
Remove hardcoded admin credentials; rename compose file for consistency
atsyplikhin Apr 7, 2026
32ba533
Fix security and robustness issues found during PR review
atsyplikhin Apr 7, 2026
e8b357c
Harden portal security: input validation, XSS escaping, path traversa…
atsyplikhin Apr 7, 2026
e79eb33
Fix tenant isolation bypass, TOCTOU race, signup ordering, and other …
atsyplikhin Apr 7, 2026
5ac25f2
Fix login form rendering inside dashboard on session expiry
atsyplikhin Apr 8, 2026
22e2657
Add starter AI agent download to portal devices page (#16)
kavya-chennoju Apr 8, 2026
9b4e489
Fix nsc store_root path mismatch in containerized portal by ShaneilP
atsyplikhin Apr 8, 2026
edfde2f
Fix portal root route 404 and harden signup flow
atsyplikhin Apr 8, 2026
a84b76b
Fix etcd revision bloat from device heartbeats
atsyplikhin Apr 9, 2026
065fbe1
Update install URLs from feat/multitenant-deployment to main
atsyplikhin Apr 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 85 additions & 16 deletions packages/device-connect-edge/device_connect_edge/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ def __init__(
self.capabilities = caps_obj
self.identity = identity_payload
self.status = status_payload
self._explicit_device_id = device_id is not None
self.device_id = device_id or f"device-{uuid.uuid4().hex[:8]}"
if not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9._-]{0,254}$', self.device_id):
raise ValueError(
Expand Down Expand Up @@ -405,6 +406,12 @@ def __init__(
self._setup_logger()
self._logger.info(f"Using device_id from credentials file: {self.device_id}")

# Override tenant from credentials file if caller used the default
creds_tenant = creds.get("tenant")
if creds_tenant and tenant == "default":
self.tenant = creds_tenant
self._logger.info(f"Using tenant from credentials file: {self.tenant}")

# ===== Messaging Configuration =====

# D2D mode: no infrastructure needed (Zenoh multicast scouting)
Expand Down Expand Up @@ -491,6 +498,7 @@ def __init__(
self._registration_expires_at: float = 0.0
self._heartbeat_interval: float = heartbeat_interval or max(1.0, self.ttl / 3)
self._registration_lock: asyncio.Lock = asyncio.Lock()
self._subscription_lock: asyncio.Lock = asyncio.Lock()

# Messaging client (initialized in run(); first assignment at line 350)

Expand Down Expand Up @@ -581,8 +589,9 @@ def _load_credentials(self, credentials_file: str, messaging_urls: Optional[list
# Try to parse as JSON first
try:
creds = json.loads(content)
# Validate device_id matches (skip if allow_insecure)
if not self.allow_insecure:
# Validate device_id matches (skip if allow_insecure or
# device_id will be auto-detected from this file)
if not self.allow_insecure and self._explicit_device_id:
self._validate_device_id_from_creds(creds)
return creds
except json.JSONDecodeError:
Expand Down Expand Up @@ -927,6 +936,23 @@ async def enqueue_event(self, event: str, payload: dict) -> None:
self._logger.error("Event queue still full after drop; event lost: %s", event)


def _build_registration_params(self) -> dict:
"""Build the registration payload shared by _register and requestRegistration."""
caps = self._driver.capabilities if self._driver else self.capabilities
params = {
"device_id": self.device_id,
"device_ttl": self.ttl,
"capabilities": caps.model_dump(),
"identity": self.identity,
"status": {
**self.status,
"ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
},
}
if hasattr(self, '_attestation_token') and self._attestation_token:
params["attestation"] = self._attestation_token
return params

async def _register(self, force: bool = False) -> None:
"""Register the device with the Device Connect registry, retrying on failure."""

Expand All @@ -947,18 +973,7 @@ async def _register(self, force: bool = False) -> None:
delay = 1 # initial retry delay in seconds
while True:
req_id = f"{self.device_id}-{int(time.time()*1000)}"
# Get capabilities dynamically from driver if available (supports runtime capability loading)
caps = self._driver.capabilities if self._driver else self.capabilities
params = {
"device_id": self.device_id,
"device_ttl": self.ttl,
"capabilities": caps.model_dump(),
"identity": self.identity,
"status": {
**self.status,
"ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
},
}
params = self._build_registration_params()
try:
self._logger.info("Registering device")
response_data = await self.messaging.request(
Expand Down Expand Up @@ -997,8 +1012,10 @@ async def _heartbeat_loop(self) -> None:
try:
if self.messaging.is_closed:
await self._connect_messaging()
reconnect_delay = 1
while not self.messaging.is_connected:
await asyncio.sleep(1)
await asyncio.sleep(reconnect_delay)
reconnect_delay = min(reconnect_delay * 2, 30)
except Exception as e:
self._logger.error("Heartbeat reconnect failed: %s, will retry next interval", e)
await asyncio.sleep(self._heartbeat_interval)
Expand All @@ -1007,7 +1024,7 @@ async def _heartbeat_loop(self) -> None:
try:
await self._register(force=True)
except Exception as e:
self._logger.error("Device re-registration failed after reconnect: %s", e)
self._logger.error("Re-registration after reconnect failed: %s", e)

# Send heartbeat
try:
Expand Down Expand Up @@ -1044,6 +1061,15 @@ async def on_msg(data: bytes, reply_subject: Optional[str]):

params_dict = payload.get("params", {})

# Built-in runtime method: registry pulls registration info
if method == "requestRegistration":
if reply_subject:
result = self._build_registration_params()
await self.messaging.publish(
reply_subject, build_rpc_response(payload["id"], result)
)
return

# Extract trace metadata for cross-device RPC correlation
dc_meta = params_dict.pop("_dc_meta", {})
source_device = dc_meta.get("source_device")
Expand Down Expand Up @@ -1155,6 +1181,7 @@ async def on_reconnect():
self._logger.info(f"{self._messaging_backend.upper()} reconnected")
if not self._d2d_mode:
self._track_task(asyncio.create_task(self._register(force=True)))
self._track_task(asyncio.create_task(self._resubscribe_after_reconnect()))
await self._notify_conn_state(True)

# Create messaging client based on backend
Expand Down Expand Up @@ -1586,6 +1613,48 @@ async def _teardown_agentic_driver(self) -> None:
self._logger.debug("Tearing down DeviceDriver subscriptions")
await self._driver.teardown_subscriptions()

async def _resubscribe_after_reconnect(self) -> None:
"""Re-establish event subscriptions after a messaging reconnect.

After extended disconnections (e.g. laptop sleep), auto-resubscribe
may not restore all subscriptions. This explicitly tears down and
recreates ``@on`` event subscriptions with exponential backoff.

Uses ``_subscription_lock`` to prevent concurrent invocations
from rapid reconnects.
"""
if not self._subscription_lock.acquire_nowait():
self._logger.debug("Subscription re-establishment already in progress, skipping")
return

try:
delay = 1
while True:
try:
if not self.messaging.is_connected:
await asyncio.sleep(1)
continue

if self._driver is not None:
try:
from device_connect_edge.drivers.base import DeviceDriver
if isinstance(self._driver, DeviceDriver):
self._logger.info("Re-establishing event subscriptions after reconnect")
await self._driver.teardown_subscriptions()
await self._driver.setup_subscriptions()
self._logger.info("Event subscriptions re-established")
except ImportError:
pass
break
except Exception as e:
self._logger.warning(
"Subscription re-establishment failed: %s; retrying in %ss", e, delay
)
await asyncio.sleep(delay)
delay = min(delay * 2, 30)
finally:
self._subscription_lock.release()

def _handle_registration_reply(self, data: bytes) -> None:
"""Parse registry response and update local registration metadata."""

Expand Down
62 changes: 41 additions & 21 deletions packages/device-connect-edge/device_connect_edge/registry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from __future__ import annotations

import asyncio
import json
import logging
import time
Expand Down Expand Up @@ -74,30 +75,49 @@ async def _request(
method: str,
params: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
retries: int = 3,
) -> Any:
"""Send a JSON-RPC 2.0 request and return the result."""
"""Send a JSON-RPC 2.0 request and return the result.

Retries on ``RequestTimeoutError`` with exponential backoff.
Server-side errors (``RuntimeError``) are raised immediately.
"""
timeout = timeout or self._timeout
req_id = f"rpc-{uuid.uuid4().hex[:12]}"
payload: Dict[str, Any] = {
"jsonrpc": "2.0",
"id": req_id,
"method": method,
}
if params:
payload["params"] = params

response_data = await self._client.request(
subject, json.dumps(payload).encode(), timeout=timeout,
)
response = json.loads(response_data)
delay = 1
last_err: Optional[Exception] = None
for attempt in range(1, retries + 1):
req_id = f"rpc-{uuid.uuid4().hex[:12]}"
payload: Dict[str, Any] = {
"jsonrpc": "2.0",
"id": req_id,
"method": method,
}
if params:
payload["params"] = params

try:
response_data = await self._client.request(
subject, json.dumps(payload).encode(), timeout=timeout,
)
response = json.loads(response_data)

if "error" in response:
error = response["error"]
raise RuntimeError(
f"Registry error ({error.get('code', -1)}): "
f"{error.get('message', 'Unknown error')}"
)
return response.get("result")
if "error" in response:
error = response["error"]
raise RuntimeError(
f"Registry error ({error.get('code', -1)}): "
f"{error.get('message', 'Unknown error')}"
)
return response.get("result")
except RequestTimeoutError as e:
last_err = e
if attempt < retries:
logger.warning(
"Registry request %s timed out (attempt %d/%d), retrying in %ss",
method, attempt, retries, delay,
)
await asyncio.sleep(delay)
delay = min(delay * 2, 30)
raise last_err # type: ignore[misc]

# ── DiscoveryProvider interface ──────────────────────────────────

Expand Down
87 changes: 87 additions & 0 deletions packages/device-connect-edge/tests/test_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,3 +530,90 @@ async def test_departure_published_on_shutdown(self):
assert payload["departing"] is True
assert payload["device_id"] == "cam-01"
assert payload["tenant"] == "lab"


# ── _build_registration_params ──────────────────────────────────

class TestBuildRegistrationParams:
def test_returns_expected_structure(self):
driver = StubDriver()
rt = DeviceRuntime(
driver=driver,
device_id="sensor-42",
tenant="lab",
messaging_urls=["nats://localhost:4222"],
)
rt._attestation_token = "tok-abc-123"

params = rt._build_registration_params()

assert params["device_id"] == "sensor-42"
assert params["device_ttl"] == rt.ttl
assert "functions" in params["capabilities"]
assert "events" in params["capabilities"]
assert params["identity"]["device_type"] == "stub"
assert params["identity"]["manufacturer"] == "Test"
assert "ts" in params["status"]
assert params["status"]["location"] == "lab"
assert params["attestation"] == "tok-abc-123"

def test_no_attestation_when_unset(self):
driver = StubDriver()
rt = DeviceRuntime(
driver=driver,
device_id="sensor-43",
messaging_urls=["nats://localhost:4222"],
)

params = rt._build_registration_params()

assert "attestation" not in params


# ── _cmd_subscription requestRegistration ───────────────────────

class TestCmdSubscriptionRequestRegistration:
@pytest.mark.asyncio
async def test_request_registration_returns_payload(self):
"""requestRegistration built-in RPC publishes registration params to reply_subject."""
driver = StubDriver()
rt = DeviceRuntime(
driver=driver,
device_id="cam-99",
tenant="test",
messaging_urls=["nats://localhost:4222"],
)
rt.messaging = AsyncMock()

# Build a requestRegistration JSON-RPC message
rpc_msg = json.dumps({
"jsonrpc": "2.0",
"id": "req-1",
"method": "requestRegistration",
"params": {},
}).encode()

# Subscribe captures the on_msg callback
rt.messaging.subscribe = AsyncMock()
await rt._cmd_subscription()
on_msg = rt.messaging.subscribe.call_args[1]["callback"]

# Invoke the callback with a reply_subject
await on_msg(rpc_msg, reply_subject="reply.inbox.1")

# Verify messaging.publish was called with the reply_subject
rt.messaging.publish.assert_called_once()
call_args = rt.messaging.publish.call_args[0]
assert call_args[0] == "reply.inbox.1"

# The response should be a valid JSON-RPC response with registration params
response = json.loads(call_args[1])
assert response["jsonrpc"] == "2.0"
assert response["id"] == "req-1"
result = response["result"]
assert result["device_id"] == "cam-99"
assert result["device_ttl"] == rt.ttl
assert "capabilities" in result
assert "identity" in result
assert "status" in result
assert "ts" in result["status"]
Loading