Skip to content

Commit 444304e

Browse files
committed
Add multi-tenant deployment with self-service portal (#18)
Multi-tenant deployment with broker-enforced isolation, self-service tenant management web portal, device connection resilience, support for Zenoh/NATS/MQTT messaging backends, live event streaming, RPC invocation from portal, greenhouse demo bundle, and security hardening.
1 parent 539cca0 commit 444304e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+8281
-144
lines changed

packages/device-connect-edge/device_connect_edge/device.py

Lines changed: 85 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ def __init__(
326326
self.capabilities = caps_obj
327327
self.identity = identity_payload
328328
self.status = status_payload
329+
self._explicit_device_id = device_id is not None
329330
self.device_id = device_id or f"device-{uuid.uuid4().hex[:8]}"
330331
if not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9._-]{0,254}$', self.device_id):
331332
raise ValueError(
@@ -405,6 +406,12 @@ def __init__(
405406
self._setup_logger()
406407
self._logger.info(f"Using device_id from credentials file: {self.device_id}")
407408

409+
# Override tenant from credentials file if caller used the default
410+
creds_tenant = creds.get("tenant")
411+
if creds_tenant and tenant == "default":
412+
self.tenant = creds_tenant
413+
self._logger.info(f"Using tenant from credentials file: {self.tenant}")
414+
408415
# ===== Messaging Configuration =====
409416

410417
# D2D mode: no infrastructure needed (Zenoh multicast scouting)
@@ -491,6 +498,7 @@ def __init__(
491498
self._registration_expires_at: float = 0.0
492499
self._heartbeat_interval: float = heartbeat_interval or max(1.0, self.ttl / 3)
493500
self._registration_lock: asyncio.Lock = asyncio.Lock()
501+
self._subscription_lock: asyncio.Lock = asyncio.Lock()
494502

495503
# Messaging client (initialized in run(); first assignment at line 350)
496504

@@ -581,8 +589,9 @@ def _load_credentials(self, credentials_file: str, messaging_urls: Optional[list
581589
# Try to parse as JSON first
582590
try:
583591
creds = json.loads(content)
584-
# Validate device_id matches (skip if allow_insecure)
585-
if not self.allow_insecure:
592+
# Validate device_id matches (skip if allow_insecure or
593+
# device_id will be auto-detected from this file)
594+
if not self.allow_insecure and self._explicit_device_id:
586595
self._validate_device_id_from_creds(creds)
587596
return creds
588597
except json.JSONDecodeError:
@@ -927,6 +936,23 @@ async def enqueue_event(self, event: str, payload: dict) -> None:
927936
self._logger.error("Event queue still full after drop; event lost: %s", event)
928937

929938

939+
def _build_registration_params(self) -> dict:
940+
"""Build the registration payload shared by _register and requestRegistration."""
941+
caps = self._driver.capabilities if self._driver else self.capabilities
942+
params = {
943+
"device_id": self.device_id,
944+
"device_ttl": self.ttl,
945+
"capabilities": caps.model_dump(),
946+
"identity": self.identity,
947+
"status": {
948+
**self.status,
949+
"ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
950+
},
951+
}
952+
if hasattr(self, '_attestation_token') and self._attestation_token:
953+
params["attestation"] = self._attestation_token
954+
return params
955+
930956
async def _register(self, force: bool = False) -> None:
931957
"""Register the device with the Device Connect registry, retrying on failure."""
932958

@@ -947,18 +973,7 @@ async def _register(self, force: bool = False) -> None:
947973
delay = 1 # initial retry delay in seconds
948974
while True:
949975
req_id = f"{self.device_id}-{int(time.time()*1000)}"
950-
# Get capabilities dynamically from driver if available (supports runtime capability loading)
951-
caps = self._driver.capabilities if self._driver else self.capabilities
952-
params = {
953-
"device_id": self.device_id,
954-
"device_ttl": self.ttl,
955-
"capabilities": caps.model_dump(),
956-
"identity": self.identity,
957-
"status": {
958-
**self.status,
959-
"ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
960-
},
961-
}
976+
params = self._build_registration_params()
962977
try:
963978
self._logger.info("Registering device")
964979
response_data = await self.messaging.request(
@@ -997,8 +1012,10 @@ async def _heartbeat_loop(self) -> None:
9971012
try:
9981013
if self.messaging.is_closed:
9991014
await self._connect_messaging()
1015+
reconnect_delay = 1
10001016
while not self.messaging.is_connected:
1001-
await asyncio.sleep(1)
1017+
await asyncio.sleep(reconnect_delay)
1018+
reconnect_delay = min(reconnect_delay * 2, 30)
10021019
except Exception as e:
10031020
self._logger.error("Heartbeat reconnect failed: %s, will retry next interval", e)
10041021
await asyncio.sleep(self._heartbeat_interval)
@@ -1007,7 +1024,7 @@ async def _heartbeat_loop(self) -> None:
10071024
try:
10081025
await self._register(force=True)
10091026
except Exception as e:
1010-
self._logger.error("Device re-registration failed after reconnect: %s", e)
1027+
self._logger.error("Re-registration after reconnect failed: %s", e)
10111028

10121029
# Send heartbeat
10131030
try:
@@ -1044,6 +1061,15 @@ async def on_msg(data: bytes, reply_subject: Optional[str]):
10441061

10451062
params_dict = payload.get("params", {})
10461063

1064+
# Built-in runtime method: registry pulls registration info
1065+
if method == "requestRegistration":
1066+
if reply_subject:
1067+
result = self._build_registration_params()
1068+
await self.messaging.publish(
1069+
reply_subject, build_rpc_response(payload["id"], result)
1070+
)
1071+
return
1072+
10471073
# Extract trace metadata for cross-device RPC correlation
10481074
dc_meta = params_dict.pop("_dc_meta", {})
10491075
source_device = dc_meta.get("source_device")
@@ -1155,6 +1181,7 @@ async def on_reconnect():
11551181
self._logger.info(f"{self._messaging_backend.upper()} reconnected")
11561182
if not self._d2d_mode:
11571183
self._track_task(asyncio.create_task(self._register(force=True)))
1184+
self._track_task(asyncio.create_task(self._resubscribe_after_reconnect()))
11581185
await self._notify_conn_state(True)
11591186

11601187
# Create messaging client based on backend
@@ -1586,6 +1613,48 @@ async def _teardown_agentic_driver(self) -> None:
15861613
self._logger.debug("Tearing down DeviceDriver subscriptions")
15871614
await self._driver.teardown_subscriptions()
15881615

1616+
async def _resubscribe_after_reconnect(self) -> None:
1617+
"""Re-establish event subscriptions after a messaging reconnect.
1618+
1619+
After extended disconnections (e.g. laptop sleep), auto-resubscribe
1620+
may not restore all subscriptions. This explicitly tears down and
1621+
recreates ``@on`` event subscriptions with exponential backoff.
1622+
1623+
Uses ``_subscription_lock`` to prevent concurrent invocations
1624+
from rapid reconnects.
1625+
"""
1626+
if not self._subscription_lock.acquire_nowait():
1627+
self._logger.debug("Subscription re-establishment already in progress, skipping")
1628+
return
1629+
1630+
try:
1631+
delay = 1
1632+
while True:
1633+
try:
1634+
if not self.messaging.is_connected:
1635+
await asyncio.sleep(1)
1636+
continue
1637+
1638+
if self._driver is not None:
1639+
try:
1640+
from device_connect_edge.drivers.base import DeviceDriver
1641+
if isinstance(self._driver, DeviceDriver):
1642+
self._logger.info("Re-establishing event subscriptions after reconnect")
1643+
await self._driver.teardown_subscriptions()
1644+
await self._driver.setup_subscriptions()
1645+
self._logger.info("Event subscriptions re-established")
1646+
except ImportError:
1647+
pass
1648+
break
1649+
except Exception as e:
1650+
self._logger.warning(
1651+
"Subscription re-establishment failed: %s; retrying in %ss", e, delay
1652+
)
1653+
await asyncio.sleep(delay)
1654+
delay = min(delay * 2, 30)
1655+
finally:
1656+
self._subscription_lock.release()
1657+
15891658
def _handle_registration_reply(self, data: bytes) -> None:
15901659
"""Parse registry response and update local registration metadata."""
15911660

packages/device-connect-edge/device_connect_edge/registry_client.py

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
from __future__ import annotations
2525

26+
import asyncio
2627
import json
2728
import logging
2829
import time
@@ -74,30 +75,49 @@ async def _request(
7475
method: str,
7576
params: Optional[Dict[str, Any]] = None,
7677
timeout: Optional[float] = None,
78+
retries: int = 3,
7779
) -> Any:
78-
"""Send a JSON-RPC 2.0 request and return the result."""
80+
"""Send a JSON-RPC 2.0 request and return the result.
81+
82+
Retries on ``RequestTimeoutError`` with exponential backoff.
83+
Server-side errors (``RuntimeError``) are raised immediately.
84+
"""
7985
timeout = timeout or self._timeout
80-
req_id = f"rpc-{uuid.uuid4().hex[:12]}"
81-
payload: Dict[str, Any] = {
82-
"jsonrpc": "2.0",
83-
"id": req_id,
84-
"method": method,
85-
}
86-
if params:
87-
payload["params"] = params
88-
89-
response_data = await self._client.request(
90-
subject, json.dumps(payload).encode(), timeout=timeout,
91-
)
92-
response = json.loads(response_data)
86+
delay = 1
87+
last_err: Optional[Exception] = None
88+
for attempt in range(1, retries + 1):
89+
req_id = f"rpc-{uuid.uuid4().hex[:12]}"
90+
payload: Dict[str, Any] = {
91+
"jsonrpc": "2.0",
92+
"id": req_id,
93+
"method": method,
94+
}
95+
if params:
96+
payload["params"] = params
97+
98+
try:
99+
response_data = await self._client.request(
100+
subject, json.dumps(payload).encode(), timeout=timeout,
101+
)
102+
response = json.loads(response_data)
93103

94-
if "error" in response:
95-
error = response["error"]
96-
raise RuntimeError(
97-
f"Registry error ({error.get('code', -1)}): "
98-
f"{error.get('message', 'Unknown error')}"
99-
)
100-
return response.get("result")
104+
if "error" in response:
105+
error = response["error"]
106+
raise RuntimeError(
107+
f"Registry error ({error.get('code', -1)}): "
108+
f"{error.get('message', 'Unknown error')}"
109+
)
110+
return response.get("result")
111+
except RequestTimeoutError as e:
112+
last_err = e
113+
if attempt < retries:
114+
logger.warning(
115+
"Registry request %s timed out (attempt %d/%d), retrying in %ss",
116+
method, attempt, retries, delay,
117+
)
118+
await asyncio.sleep(delay)
119+
delay = min(delay * 2, 30)
120+
raise last_err # type: ignore[misc]
101121

102122
# ── DiscoveryProvider interface ──────────────────────────────────
103123

packages/device-connect-edge/tests/test_device.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,3 +530,90 @@ async def test_departure_published_on_shutdown(self):
530530
assert payload["departing"] is True
531531
assert payload["device_id"] == "cam-01"
532532
assert payload["tenant"] == "lab"
533+
534+
535+
# ── _build_registration_params ──────────────────────────────────
536+
537+
class TestBuildRegistrationParams:
538+
def test_returns_expected_structure(self):
539+
driver = StubDriver()
540+
rt = DeviceRuntime(
541+
driver=driver,
542+
device_id="sensor-42",
543+
tenant="lab",
544+
messaging_urls=["nats://localhost:4222"],
545+
)
546+
rt._attestation_token = "tok-abc-123"
547+
548+
params = rt._build_registration_params()
549+
550+
assert params["device_id"] == "sensor-42"
551+
assert params["device_ttl"] == rt.ttl
552+
assert "functions" in params["capabilities"]
553+
assert "events" in params["capabilities"]
554+
assert params["identity"]["device_type"] == "stub"
555+
assert params["identity"]["manufacturer"] == "Test"
556+
assert "ts" in params["status"]
557+
assert params["status"]["location"] == "lab"
558+
assert params["attestation"] == "tok-abc-123"
559+
560+
def test_no_attestation_when_unset(self):
561+
driver = StubDriver()
562+
rt = DeviceRuntime(
563+
driver=driver,
564+
device_id="sensor-43",
565+
messaging_urls=["nats://localhost:4222"],
566+
)
567+
568+
params = rt._build_registration_params()
569+
570+
assert "attestation" not in params
571+
572+
573+
# ── _cmd_subscription requestRegistration ───────────────────────
574+
575+
class TestCmdSubscriptionRequestRegistration:
576+
@pytest.mark.asyncio
577+
async def test_request_registration_returns_payload(self):
578+
"""requestRegistration built-in RPC publishes registration params to reply_subject."""
579+
driver = StubDriver()
580+
rt = DeviceRuntime(
581+
driver=driver,
582+
device_id="cam-99",
583+
tenant="test",
584+
messaging_urls=["nats://localhost:4222"],
585+
)
586+
rt.messaging = AsyncMock()
587+
588+
# Build a requestRegistration JSON-RPC message
589+
rpc_msg = json.dumps({
590+
"jsonrpc": "2.0",
591+
"id": "req-1",
592+
"method": "requestRegistration",
593+
"params": {},
594+
}).encode()
595+
596+
# Subscribe captures the on_msg callback
597+
rt.messaging.subscribe = AsyncMock()
598+
await rt._cmd_subscription()
599+
on_msg = rt.messaging.subscribe.call_args[1]["callback"]
600+
601+
# Invoke the callback with a reply_subject
602+
await on_msg(rpc_msg, reply_subject="reply.inbox.1")
603+
604+
# Verify messaging.publish was called with the reply_subject
605+
rt.messaging.publish.assert_called_once()
606+
call_args = rt.messaging.publish.call_args[0]
607+
assert call_args[0] == "reply.inbox.1"
608+
609+
# The response should be a valid JSON-RPC response with registration params
610+
response = json.loads(call_args[1])
611+
assert response["jsonrpc"] == "2.0"
612+
assert response["id"] == "req-1"
613+
result = response["result"]
614+
assert result["device_id"] == "cam-99"
615+
assert result["device_ttl"] == rt.ttl
616+
assert "capabilities" in result
617+
assert "identity" in result
618+
assert "status" in result
619+
assert "ts" in result["status"]

0 commit comments

Comments
 (0)