diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e3fb4fe..0a90afc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -99,6 +99,48 @@ jobs: TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN }} run: python -m twine upload dist/* + publish-ghcr-dev: + name: Publish dev image to GHCR + needs: tests + runs-on: ubuntu-latest + if: github.event_name == 'pull_request' + # Require manual approval only for PRs from forks (untrusted code) + environment: ${{ github.event.pull_request.head.repo.full_name != github.repository && 'dev-deploy' || '' }} + permissions: + contents: read + packages: write + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + with: + platforms: linux/amd64,linux/arm64 + + - name: Log in to GHCR + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Build and push dev image + uses: docker/build-push-action@v5 + with: + context: . + push: true + platforms: linux/amd64,linux/arm64 + tags: ghcr.io/${{ github.repository_owner }}/meshcore-proxy:dev-${{ github.event.pull_request.number }} + labels: | + org.opencontainers.image.source=https://github.com/${{ github.repository }} + org.opencontainers.image.revision=${{ github.sha }} + build-args: | + MESHCORE_PROXY_VERSION=0.0.0.dev${{ github.event.pull_request.number }} + publish-ghcr: name: Publish to GHCR needs: build diff --git a/Dockerfile b/Dockerfile index 2474d3d..5eb1a86 100644 --- a/Dockerfile +++ b/Dockerfile @@ -34,6 +34,6 @@ USER meshcore # Default port EXPOSE 5000 -# Default command (override with actual connection args) +# Run the proxy - configure via environment variables or command args ENTRYPOINT ["meshcore-proxy"] -CMD ["--help"] +CMD [] diff --git a/docker-compose.yml b/docker-compose.yml index 7ceeddc..b8bdc69 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,6 +3,15 @@ # Usage: # USB Serial: docker compose --profile serial up # BLE: docker compose --profile ble up +# +# Environment variables (can be set in .env file or shell): +# SERIAL_PORT - Serial port path (default: /dev/ttyUSB0) +# BLE_ADDRESS - BLE device MAC address +# TCP_PORT - TCP server port (default: 5000) +# BLE_PIN - BLE pairing PIN (default: 123456) +# LOG_LEVEL - Log verbosity: off, error, warning, info, debug, verbose (default: info) +# LOG_JSON - Output logs as JSON: true/false +# VIRTUALIZE_CHANNELS - Enable channel slot virtualization: true/false services: # USB Serial connection profile @@ -10,10 +19,15 @@ services: image: ghcr.io/rgregg/meshcore-proxy:latest profiles: ["serial"] ports: - - "5000:5000" + - "${TCP_PORT:-5000}:${TCP_PORT:-5000}" devices: - - "/dev/ttyUSB0:/dev/ttyUSB0" - command: ["--serial", "/dev/ttyUSB0", "--port", "5000"] + - "${SERIAL_PORT:-/dev/ttyUSB0}:${SERIAL_PORT:-/dev/ttyUSB0}" + environment: + - SERIAL_PORT=${SERIAL_PORT:-/dev/ttyUSB0} + - TCP_PORT=${TCP_PORT:-5000} + - VIRTUALIZE_CHANNELS=${VIRTUALIZE_CHANNELS:-} + - LOG_LEVEL=${LOG_LEVEL:-} + - LOG_JSON=${LOG_JSON:-} restart: unless-stopped # BLE connection profile (requires host network for Bluetooth) @@ -26,5 +40,9 @@ services: - /var/run/dbus:/var/run/dbus:ro environment: - BLE_ADDRESS=${BLE_ADDRESS:-} - command: ["--ble", "${BLE_ADDRESS}", "--port", "5000"] # don't need to expose PORT because of host networking + - BLE_PIN=${BLE_PIN:-123456} + - TCP_PORT=${TCP_PORT:-5000} + - VIRTUALIZE_CHANNELS=${VIRTUALIZE_CHANNELS:-} + - LOG_LEVEL=${LOG_LEVEL:-} + - LOG_JSON=${LOG_JSON:-} restart: unless-stopped diff --git a/src/meshcore_proxy/channel_virtualizer.py b/src/meshcore_proxy/channel_virtualizer.py new file mode 100644 index 0000000..e7d388b --- /dev/null +++ b/src/meshcore_proxy/channel_virtualizer.py @@ -0,0 +1,221 @@ +# src/meshcore_proxy/channel_virtualizer.py +"""Channel slot virtualization for multi-client MeshCore proxy.""" + +import logging +import time +from dataclasses import dataclass, field + +logger = logging.getLogger(__name__) + +# Command codes +CMD_SET_CHANNEL = 0x20 +CMD_SEND_CHAN_MSG = 0x03 +CMD_GET_CHANNEL = 0x1F + +# Response codes with channel indices +RESP_CHANNEL_INFO = 0x12 +RESP_CHANNEL_MSG_RECV = 0x08 +RESP_CHANNEL_MSG_RECV_V3 = 0x11 + + +@dataclass +class PhysicalSlot: + """A physical channel slot on the radio.""" + + physical_idx: int + config: bytes # raw channel config (name + secret, bytes 2-49 of SET_CHANNEL) + clients: set = field(default_factory=set) # set of (client_addr, virtual_idx) + last_used: float = field(default_factory=time.monotonic) + + +class ChannelSlotAllocator: + """Manages virtual-to-physical channel slot mapping with dedup and LRU eviction.""" + + def __init__(self, max_slots: int = 40): + self._max_slots = max_slots + self._physical_slots: dict[int, PhysicalSlot] = {} + self._virtual_to_physical: dict[tuple, int] = {} + self._available_slots: set[int] = set(range(max_slots)) + + def process_outgoing(self, client_addr: tuple, payload: bytes) -> bytes | None: + """Process an outgoing command, rewriting channel indices if needed. + + Returns: + Rewritten payload to send to radio, or None if command should be + suppressed (dedup hit on SET_CHANNEL). + """ + if not payload: + return payload + + cmd = payload[0] + + if cmd == CMD_SET_CHANNEL: + return self._handle_set_channel(client_addr, payload) + elif cmd == CMD_SEND_CHAN_MSG: + return self._handle_send_chan_msg(client_addr, payload) + elif cmd == CMD_GET_CHANNEL: + return self._handle_get_channel(client_addr, payload) + else: + return payload + + def _handle_set_channel(self, client_addr: tuple, payload: bytes) -> bytes | None: + """Handle SET_CHANNEL: allocate/dedup physical slot, rewrite index.""" + virtual_idx = payload[1] + config = payload[2:] # name (32 bytes) + secret (16 bytes) + client_key = (client_addr, virtual_idx) + + # Step 1: Handle reassignment - release old mapping first + if client_key in self._virtual_to_physical: + old_physical = self._virtual_to_physical.pop(client_key) + if old_physical in self._physical_slots: + slot = self._physical_slots[old_physical] + slot.clients.discard(client_key) + if not slot.clients: + del self._physical_slots[old_physical] + self._available_slots.add(old_physical) + logger.debug( + f"Released physical slot {old_physical} (no remaining clients)" + ) + + # Step 2: Check for dedup match + for phys_idx, slot in self._physical_slots.items(): + if slot.config == config: + slot.clients.add(client_key) + slot.last_used = time.monotonic() + self._virtual_to_physical[client_key] = phys_idx + logger.debug( + f"Dedup: {client_addr} virtual {virtual_idx} -> " + f"existing physical {phys_idx}" + ) + return None # No radio command needed + + # Step 3: Allocate new physical slot + if not self._available_slots: + self._evict_lru() + + physical_idx = min(self._available_slots) + self._available_slots.remove(physical_idx) + + self._physical_slots[physical_idx] = PhysicalSlot( + physical_idx=physical_idx, + config=config, + clients={client_key}, + ) + self._virtual_to_physical[client_key] = physical_idx + + logger.debug( + f"Allocated: {client_addr} virtual {virtual_idx} -> physical {physical_idx}" + ) + + # Rewrite the index byte + return payload[0:1] + physical_idx.to_bytes(1, "little") + config + + def _handle_send_chan_msg(self, client_addr: tuple, payload: bytes) -> bytes: + """Handle SEND_CHAN_MSG: rewrite channel index if mapped.""" + virtual_idx = payload[2] + client_key = (client_addr, virtual_idx) + + physical_idx = self._virtual_to_physical.get(client_key) + if physical_idx is None: + logger.warning( + f"SEND_CHAN_MSG from {client_addr} for unmapped virtual slot " + f"{virtual_idx}, passing through" + ) + return payload + + # Update last_used timestamp + if physical_idx in self._physical_slots: + self._physical_slots[physical_idx].last_used = time.monotonic() + + # Rewrite: byte 0 (cmd) + byte 1 (flags) + byte 2 (chan idx) + rest + return payload[0:2] + physical_idx.to_bytes(1, "little") + payload[3:] + + def _handle_get_channel(self, client_addr: tuple, payload: bytes) -> bytes: + """Handle GET_CHANNEL: rewrite channel index if mapped.""" + virtual_idx = payload[1] + client_key = (client_addr, virtual_idx) + + physical_idx = self._virtual_to_physical.get(client_key) + if physical_idx is None: + return payload # Pass through unmapped + + return payload[0:1] + physical_idx.to_bytes(1, "little") + + def process_incoming(self, client_addr: tuple, payload: bytes) -> bytes: + """Rewrite channel indices in radio responses for a specific client. + + Returns the payload with physical indices replaced by virtual indices + where a mapping exists. + """ + if not payload: + return payload + + resp_type = payload[0] + + if resp_type in (RESP_CHANNEL_INFO, RESP_CHANNEL_MSG_RECV, RESP_CHANNEL_MSG_RECV_V3): + return self._rewrite_response_channel_idx(client_addr, payload, idx_offset=1) + else: + return payload + + def _rewrite_response_channel_idx( + self, client_addr: tuple, payload: bytes, idx_offset: int + ) -> bytes: + """Rewrite a channel index in a response payload for a specific client.""" + if len(payload) <= idx_offset: + return payload + + physical_idx = payload[idx_offset] + + if physical_idx not in self._physical_slots: + return payload + + slot = self._physical_slots[physical_idx] + # Find this client's virtual index for this physical slot + for (addr, virtual_idx) in slot.clients: + if addr == client_addr: + return ( + payload[:idx_offset] + + virtual_idx.to_bytes(1, "little") + + payload[idx_offset + 1 :] + ) + + # Client has no mapping for this physical slot - pass through + return payload + + def remove_client(self, client_addr: tuple) -> None: + """Remove all mappings for a disconnected client.""" + keys_to_remove = [ + key for key in self._virtual_to_physical if key[0] == client_addr + ] + for client_key in keys_to_remove: + physical_idx = self._virtual_to_physical.pop(client_key) + if physical_idx in self._physical_slots: + slot = self._physical_slots[physical_idx] + slot.clients.discard(client_key) + if not slot.clients: + del self._physical_slots[physical_idx] + self._available_slots.add(physical_idx) + logger.debug( + f"Released physical slot {physical_idx} " + f"(client {client_addr} disconnected)" + ) + + def _evict_lru(self) -> None: + """Evict the least recently used physical slot to free space.""" + if not self._physical_slots: + return + + lru_slot = min(self._physical_slots.values(), key=lambda s: s.last_used) + + logger.warning( + f"Evicting LRU physical slot {lru_slot.physical_idx} " + f"(last used {time.monotonic() - lru_slot.last_used:.1f}s ago, " + f"clients: {lru_slot.clients})" + ) + + # Remove all client mappings pointing to this slot + for client_key in list(lru_slot.clients): + self._virtual_to_physical.pop(client_key, None) + + del self._physical_slots[lru_slot.physical_idx] + self._available_slots.add(lru_slot.physical_idx) diff --git a/src/meshcore_proxy/cli.py b/src/meshcore_proxy/cli.py index b2d7840..32fe691 100644 --- a/src/meshcore_proxy/cli.py +++ b/src/meshcore_proxy/cli.py @@ -4,6 +4,7 @@ import asyncio import functools import logging +import os import signal import sys @@ -23,7 +24,7 @@ def parse_args() -> argparse.Namespace: meshcore-proxy --ble 12:34:56:78:90:AB # With event logging - meshcore-proxy --serial /dev/ttyUSB0 --log-events + meshcore-proxy --serial /dev/ttyUSB0 --log-level debug # Specify TCP port meshcore-proxy --serial /dev/ttyUSB0 --port 5000 @@ -31,76 +32,75 @@ def parse_args() -> argparse.Namespace: ) # Connection type (mutually exclusive) - conn_group = parser.add_mutually_exclusive_group(required=True) + # Allow env vars so docker-compose can configure without modifying command + conn_group = parser.add_mutually_exclusive_group( + required=not (os.environ.get("SERIAL_PORT") or os.environ.get("BLE_ADDRESS")), + ) conn_group.add_argument( "--serial", metavar="PORT", - help="Serial port path (e.g., /dev/ttyUSB0)", + default=os.environ.get("SERIAL_PORT"), + help="Serial port path (e.g., /dev/ttyUSB0) [env: SERIAL_PORT]", ) conn_group.add_argument( "--ble", metavar="MAC", - help="BLE device MAC address (e.g., 12:34:56:78:90:AB)", + default=os.environ.get("BLE_ADDRESS"), + help="BLE device MAC address (e.g., 12:34:56:78:90:AB) [env: BLE_ADDRESS]", ) # TCP server options parser.add_argument( "--host", - default="0.0.0.0", - help="TCP server bind address (default: 0.0.0.0)", + default=os.environ.get("TCP_HOST", "0.0.0.0"), + help="TCP server bind address (default: 0.0.0.0) [env: TCP_HOST]", ) parser.add_argument( "--port", type=int, - default=5000, - help="TCP server port (default: 5000)", + default=int(os.environ.get("TCP_PORT", "5000")), + help="TCP server port (default: 5000) [env: TCP_PORT]", ) # Serial options parser.add_argument( "--baud", type=int, - default=115200, - help="Serial baud rate (default: 115200)", + default=int(os.environ.get("BAUD_RATE", "115200")), + help="Serial baud rate (default: 115200) [env: BAUD_RATE]", ) # BLE options parser.add_argument( "--ble-pin", - default="123456", - help="BLE pairing PIN (default: 123456)", + default=os.environ.get("BLE_PIN", "123456"), + help="BLE pairing PIN (default: 123456) [env: BLE_PIN]", ) - # Event logging options (mutually exclusive) - log_group = parser.add_mutually_exclusive_group() - log_group.add_argument( - "--quiet", - action="store_true", - help="Suppress non-error output", - ) - log_group.add_argument( - "--log-events", - action="store_true", - help="Log event summaries (type, direction, basic info)", - ) - log_group.add_argument( - "--log-events-verbose", - action="store_true", - help="Log full decoded event details", + # Log level: controls both Python logging and event output verbosity + # off=errors only, error=errors only, warning=warnings+errors, + # info=normal (default), debug=debug logging, verbose=debug + full event details + parser.add_argument( + "--log-level", + default=os.environ.get("LOG_LEVEL", "info"), + choices=["off", "error", "warning", "info", "debug", "verbose"], + help="Log verbosity: off, error, warning, info, debug, verbose (default: info) [env: LOG_LEVEL]", ) # Output format parser.add_argument( "--json", action="store_true", - help="Output event logs as JSON (for parsing)", + default=os.environ.get("LOG_JSON", "").lower() in ("1", "true", "yes"), + help="Output event logs as JSON (for parsing) [env: LOG_JSON]", ) - # Debug logging + # Channel virtualization parser.add_argument( - "--debug", + "--virtualize-channels", action="store_true", - help="Enable debug logging", + default=os.environ.get("VIRTUALIZE_CHANNELS", "").lower() in ("1", "true", "yes"), + help="Virtualize channel slots for multi-client isolation [env: VIRTUALIZE_CHANNELS]", ) return parser.parse_args() @@ -152,23 +152,26 @@ def main() -> int: """Main entry point.""" args = parse_args() - # Determine event log level - if args.quiet: - event_log_level = EventLogLevel.OFF - elif args.log_events_verbose: - event_log_level = EventLogLevel.VERBOSE - elif args.log_events: - event_log_level = EventLogLevel.SUMMARY - else: - event_log_level = EventLogLevel.OFF - - # Configure logging - if args.quiet: - log_level = logging.ERROR - elif args.debug: - log_level = logging.DEBUG - else: - log_level = logging.INFO + # Map unified log level to Python logging and event verbosity + level = args.log_level.lower() + log_level_map = { + "off": logging.ERROR, + "error": logging.ERROR, + "warning": logging.WARNING, + "info": logging.INFO, + "debug": logging.DEBUG, + "verbose": logging.DEBUG, + } + event_level_map = { + "off": EventLogLevel.OFF, + "error": EventLogLevel.OFF, + "warning": EventLogLevel.OFF, + "info": EventLogLevel.SUMMARY, + "debug": EventLogLevel.SUMMARY, + "verbose": EventLogLevel.VERBOSE, + } + log_level = log_level_map[level] + event_log_level = event_level_map[level] logging.basicConfig( level=log_level, @@ -186,6 +189,7 @@ def main() -> int: tcp_port=args.port, event_log_level=event_log_level, event_log_json=args.json, + virtualize_channels=args.virtualize_channels, ) # Run with signal handling diff --git a/src/meshcore_proxy/proxy.py b/src/meshcore_proxy/proxy.py index 75b9ff7..71c3489 100644 --- a/src/meshcore_proxy/proxy.py +++ b/src/meshcore_proxy/proxy.py @@ -9,6 +9,7 @@ from enum import Enum from typing import Any, Callable, Optional +from .channel_virtualizer import ChannelSlotAllocator from .decoder import decode_command, decode_response, format_decoded logger = logging.getLogger(__name__) @@ -124,6 +125,7 @@ def __init__( tcp_port: int = 5000, event_log_level: EventLogLevel = EventLogLevel.OFF, event_log_json: bool = False, + virtualize_channels: bool = False, ): self.serial_port = serial_port self.ble_address = ble_address @@ -133,6 +135,11 @@ def __init__( self.tcp_port = tcp_port self.event_log_level = event_log_level self.event_log_json = event_log_json + self.virtualize_channels = virtualize_channels + self._channel_allocator: Optional[ChannelSlotAllocator] = None + if virtualize_channels: + self._channel_allocator = ChannelSlotAllocator() + logger.info("Channel slot virtualization enabled") self._radio_connection: Optional[SerialConnection | BLEConnection] = None self._tcp_server: Optional[asyncio.Server] = None @@ -140,6 +147,8 @@ def __init__( self._is_ble = False self._is_running = False self._radio_connected = False + self._command_queue: asyncio.Queue = asyncio.Queue(maxsize=100) + self._queue_worker_task: Optional[asyncio.Task] = None async def _handle_radio_disconnect(self, reason: Optional[str] = None) -> None: """Handle radio disconnection.""" @@ -214,11 +223,17 @@ async def _handle_radio_rx(self, payload: bytes) -> None: self._log_event("FROM_RADIO", packet_type, payload) # Frame and forward to all TCP clients - framed = self._frame_payload(payload) disconnected = [] for addr, client in self._clients.items(): try: + # Rewrite channel indices per-client if virtualization enabled + client_payload = payload + if self._channel_allocator: + client_payload = self._channel_allocator.process_incoming( + addr, payload + ) + framed = self._frame_payload(client_payload) client.writer.write(framed) await client.writer.drain() except Exception as e: @@ -249,6 +264,29 @@ async def _send_to_radio(self, payload: bytes) -> None: logger.error(f"Failed to send to radio: {e}") await self._handle_radio_disconnect() + async def _run_command_queue(self) -> None: + """Worker that pulls commands from the queue and sends them to the radio.""" + while True: + payload = await self._command_queue.get() + try: + if not self._radio_connected: + logger.warning( + "Command dropped: radio not connected " + f"(queue depth: {self._command_queue.qsize()})" + ) + continue + logger.debug( + f"Sending queued command " + f"(queue depth: {self._command_queue.qsize()})" + ) + await self._send_to_radio(payload) + except asyncio.CancelledError: + raise + except Exception as e: + logger.error(f"Queue worker error: {e}") + finally: + self._command_queue.task_done() + def _parse_tcp_frame(self, client: TCPClient, data: bytes) -> list[bytes]: """ Parse incoming TCP data into complete frames. @@ -266,6 +304,14 @@ def _parse_tcp_frame(self, client: TCPClient, data: bytes) -> list[bytes]: header_needed = 3 - len(client.header) if len(remaining) >= header_needed: client.header = client.header + remaining[:header_needed] + if client.header[0:1] != b"\x3c": + logger.warning( + f"Invalid frame header from {client.addr}: " + f"expected 0x3c, got 0x{client.header[0]:02x}" + ) + client.header = b"" + offset += header_needed + continue client.frame_started = True client.frame_size = int.from_bytes(client.header[1:], byteorder="little") offset += header_needed @@ -294,6 +340,8 @@ async def _remove_client(self, addr: tuple) -> None: """Remove a client and close its connection.""" if addr in self._clients: client = self._clients.pop(addr) + if self._channel_allocator: + self._channel_allocator.remove_client(addr) try: client.writer.close() await client.writer.wait_closed() @@ -322,9 +370,24 @@ async def _handle_tcp_client( # Parse frames from the TCP data payloads = self._parse_tcp_frame(client, data) - # Forward each complete payload to the radio + # Enqueue each complete payload for serialized sending for payload in payloads: - await self._send_to_radio(payload) + # Apply channel virtualization if enabled + if self._channel_allocator: + payload = self._channel_allocator.process_outgoing( + addr, payload + ) + if payload is None: + logger.debug( + f"SET_CHANNEL from {addr} suppressed (dedup hit)" + ) + continue + + logger.debug( + f"Command enqueued from {addr} " + f"(queue depth: {self._command_queue.qsize()})" + ) + await self._command_queue.put(payload) except asyncio.CancelledError: pass @@ -393,6 +456,7 @@ async def run(self) -> None: logger.info(f"Starting MeshCore Proxy ({conn_type}: {conn_target})...") await self._start_tcp_server() + self._queue_worker_task = asyncio.create_task(self._run_command_queue()) reconnect_delay = 5 # Initial delay in seconds max_delay = 300 # 5 minutes @@ -425,6 +489,26 @@ async def stop(self) -> None: logger.info("Stopping MeshCore Proxy...") self._is_running = False + # Cancel queue worker + if self._queue_worker_task: + self._queue_worker_task.cancel() + try: + await self._queue_worker_task + except asyncio.CancelledError: + pass + + # Discard remaining queued commands + remaining = 0 + while not self._command_queue.empty(): + try: + self._command_queue.get_nowait() + self._command_queue.task_done() + remaining += 1 + except asyncio.QueueEmpty: + break + if remaining: + logger.warning(f"Discarded {remaining} queued commands on shutdown") + # Close all clients for addr in list(self._clients.keys()): await self._remove_client(addr) diff --git a/tests/test_channel_virtualizer.py b/tests/test_channel_virtualizer.py new file mode 100644 index 0000000..e5385f1 --- /dev/null +++ b/tests/test_channel_virtualizer.py @@ -0,0 +1,414 @@ +# tests/test_channel_virtualizer.py +import time +from unittest.mock import patch +from meshcore_proxy.channel_virtualizer import ChannelSlotAllocator + + +def _make_set_channel_payload(virtual_idx: int, name: str, secret: bytes = None) -> bytes: + """Build a SET_CHANNEL (0x20) payload.""" + if secret is None: + from hashlib import sha256 + secret = sha256(name.encode("utf-8")).digest()[:16] + name_bytes = name.encode("utf-8")[:32].ljust(32, b"\x00") + return b"\x20" + virtual_idx.to_bytes(1, "little") + name_bytes + secret + + +def test_set_channel_allocates_physical_slot(): + alloc = ChannelSlotAllocator(max_slots=40) + client_addr = ("127.0.0.1", 9000) + payload = _make_set_channel_payload(0, "Weather") + + result = alloc.process_outgoing(client_addr, payload) + + assert result is not None, "Should return rewritten payload" + assert result[0] == 0x20, "Command type preserved" + physical_idx = result[1] + assert 0 <= physical_idx < 40 + assert result[2:] == payload[2:] + + +def test_set_channel_dedup_returns_none(): + """Second client setting same channel should return None (no radio command needed).""" + alloc = ChannelSlotAllocator(max_slots=40) + addr_a = ("127.0.0.1", 9000) + addr_b = ("127.0.0.1", 9001) + payload = _make_set_channel_payload(0, "Weather") + + alloc.process_outgoing(addr_a, payload) + result = alloc.process_outgoing(addr_b, payload) + + assert result is None, "Dedup hit: no command sent to radio" + + +# --- Task 2: SEND_CHAN_MSG and Reassignment --- + + +def _make_send_chan_msg_payload(chan_idx: int, msg: str, timestamp: int = 0) -> bytes: + """Build a SEND_CHAN_MSG (0x03) payload.""" + return ( + b"\x03\x00" + + chan_idx.to_bytes(1, "little") + + timestamp.to_bytes(4, "little") + + msg.encode("utf-8") + ) + + +def test_send_chan_msg_rewrites_index(): + alloc = ChannelSlotAllocator(max_slots=40) + addr = ("127.0.0.1", 9000) + + set_payload = _make_set_channel_payload(5, "Weather") + result = alloc.process_outgoing(addr, set_payload) + physical_idx = result[1] + + msg_payload = _make_send_chan_msg_payload(5, "hello") + result = alloc.process_outgoing(addr, msg_payload) + + assert result[0] == 0x03, "Command type preserved" + assert result[1] == 0x00, "Flags preserved" + assert result[2] == physical_idx, "Channel index rewritten to physical" + assert result[7:] == b"hello", "Message text preserved" + + +def test_send_chan_msg_unmapped_passes_through(): + alloc = ChannelSlotAllocator(max_slots=40) + addr = ("127.0.0.1", 9000) + + msg_payload = _make_send_chan_msg_payload(3, "hello") + result = alloc.process_outgoing(addr, msg_payload) + + assert result == msg_payload, "Unmapped slot passes through unchanged" + + +def test_reassignment_releases_old_slot(): + """When a client reassigns a virtual slot, the old physical slot is released.""" + alloc = ChannelSlotAllocator(max_slots=40) + addr = ("127.0.0.1", 9000) + + payload_w = _make_set_channel_payload(0, "Weather") + result_w = alloc.process_outgoing(addr, payload_w) + physical_w = result_w[1] + + # Occupy the slot that would be reclaimed so reassignment must pick a new one + addr_hold = ("127.0.0.1", 9999) + alloc.process_outgoing(addr_hold, _make_set_channel_payload(0, "Weather")) + + payload_n = _make_set_channel_payload(0, "News") + result_n = alloc.process_outgoing(addr, payload_n) + physical_n = result_n[1] + + assert physical_w != physical_n, "Different channels get different physical slots" + # Weather slot is still held by addr_hold, so it remains allocated (not available) + assert physical_w not in alloc._available_slots + + +def test_reassignment_keeps_shared_slot(): + """Reassignment doesn't release a physical slot still used by another client.""" + alloc = ChannelSlotAllocator(max_slots=40) + addr_a = ("127.0.0.1", 9000) + addr_b = ("127.0.0.1", 9001) + + payload = _make_set_channel_payload(0, "Weather") + alloc.process_outgoing(addr_a, payload) + alloc.process_outgoing(addr_b, payload) + + payload_n = _make_set_channel_payload(0, "News") + alloc.process_outgoing(addr_b, payload_n) + + phys_weather = alloc._virtual_to_physical[(addr_a, 0)] + assert phys_weather in alloc._physical_slots + + +# --- Task 3: LRU Eviction --- + + +def test_lru_eviction_when_slots_exhausted(): + """When all slots are full, the least recently used slot is evicted.""" + alloc = ChannelSlotAllocator(max_slots=2) + addr = ("127.0.0.1", 9000) + + with patch("meshcore_proxy.channel_virtualizer.time") as mock_time: + mock_time.monotonic.return_value = 100.0 + alloc.process_outgoing(addr, _make_set_channel_payload(0, "ChannelA")) + # Fix up last_used (default_factory captures real time.monotonic) + alloc._physical_slots[alloc._virtual_to_physical[(addr, 0)]].last_used = 100.0 + + mock_time.monotonic.return_value = 200.0 + alloc.process_outgoing(addr, _make_set_channel_payload(1, "ChannelB")) + alloc._physical_slots[alloc._virtual_to_physical[(addr, 1)]].last_used = 200.0 + + mock_time.monotonic.return_value = 300.0 + result = alloc.process_outgoing(addr, _make_set_channel_payload(2, "ChannelC")) + + assert result is not None, "Should allocate after eviction" + assert len(alloc._physical_slots) == 2, "Still only 2 slots used" + assert (addr, 0) not in alloc._virtual_to_physical + + +def test_lru_eviction_respects_recent_usage(): + """LRU eviction picks the slot that was used least recently.""" + alloc = ChannelSlotAllocator(max_slots=2) + addr = ("127.0.0.1", 9000) + + with patch("meshcore_proxy.channel_virtualizer.time") as mock_time: + mock_time.monotonic.return_value = 100.0 + alloc.process_outgoing(addr, _make_set_channel_payload(0, "ChannelA")) + alloc._physical_slots[alloc._virtual_to_physical[(addr, 0)]].last_used = 100.0 + + mock_time.monotonic.return_value = 200.0 + alloc.process_outgoing(addr, _make_set_channel_payload(1, "ChannelB")) + alloc._physical_slots[alloc._virtual_to_physical[(addr, 1)]].last_used = 200.0 + + mock_time.monotonic.return_value = 300.0 + alloc.process_outgoing(addr, _make_send_chan_msg_payload(0, "ping")) + + mock_time.monotonic.return_value = 400.0 + alloc.process_outgoing(addr, _make_set_channel_payload(2, "ChannelC")) + + assert (addr, 0) in alloc._virtual_to_physical, "ChannelA survives (recently used)" + assert (addr, 1) not in alloc._virtual_to_physical, "ChannelB evicted (LRU)" + + +# --- Task 4: Response Rewriting and Client Disconnect --- + + +def _make_channel_info_response(channel_idx: int, name: str) -> bytes: + """Build a CHANNEL_INFO (0x12) response payload.""" + name_bytes = name.encode("utf-8")[:32].ljust(32, b"\x00") + return b"\x12" + channel_idx.to_bytes(1, "little") + name_bytes + + +def _make_channel_msg_recv_response( + channel_idx: int, path_len: int = 1, txt_type: int = 0, + timestamp: int = 0, text: str = "hello", +) -> bytes: + """Build a CHANNEL_MSG_RECV (0x08) response payload.""" + return ( + b"\x08" + + channel_idx.to_bytes(1, "little") + + path_len.to_bytes(1, "little") + + txt_type.to_bytes(1, "little") + + timestamp.to_bytes(4, "little") + + text.encode("utf-8") + ) + + +def test_response_rewrites_channel_info_for_mapped_client(): + alloc = ChannelSlotAllocator(max_slots=40) + addr = ("127.0.0.1", 9000) + + set_payload = _make_set_channel_payload(5, "Weather") + result = alloc.process_outgoing(addr, set_payload) + physical_idx = result[1] + + response = _make_channel_info_response(physical_idx, "Weather") + rewritten = alloc.process_incoming(addr, response) + + assert rewritten[1] == 5, "Physical index rewritten to virtual index" + assert rewritten[0] == 0x12, "Response type preserved" + assert rewritten[2:] == response[2:], "Rest of payload unchanged" + + +def test_response_rewrites_channel_msg_recv(): + alloc = ChannelSlotAllocator(max_slots=40) + addr = ("127.0.0.1", 9000) + + set_payload = _make_set_channel_payload(3, "Alerts") + result = alloc.process_outgoing(addr, set_payload) + physical_idx = result[1] + + response = _make_channel_msg_recv_response(physical_idx, text="alert!") + rewritten = alloc.process_incoming(addr, response) + + assert rewritten[1] == 3, "Physical index rewritten to virtual index" + assert rewritten[0] == 0x08, "Response type preserved" + + +def test_response_passes_through_for_unmapped_client(): + alloc = ChannelSlotAllocator(max_slots=40) + addr_a = ("127.0.0.1", 9000) + addr_b = ("127.0.0.1", 9001) + + set_payload = _make_set_channel_payload(0, "Weather") + result = alloc.process_outgoing(addr_a, set_payload) + physical_idx = result[1] + + response = _make_channel_info_response(physical_idx, "Weather") + rewritten = alloc.process_incoming(addr_b, response) + + assert rewritten == response, "Unmapped client gets raw physical index" + + +def test_non_channel_response_passes_through(): + alloc = ChannelSlotAllocator(max_slots=40) + addr = ("127.0.0.1", 9000) + + response = b"\x0c\x64\x00" + rewritten = alloc.process_incoming(addr, response) + + assert rewritten == response, "Non-channel response unchanged" + + +def test_remove_client_releases_exclusive_slots(): + alloc = ChannelSlotAllocator(max_slots=40) + addr = ("127.0.0.1", 9000) + + alloc.process_outgoing(addr, _make_set_channel_payload(0, "Weather")) + alloc.process_outgoing(addr, _make_set_channel_payload(1, "News")) + + assert len(alloc._physical_slots) == 2 + + alloc.remove_client(addr) + + assert len(alloc._physical_slots) == 0, "All slots released" + assert len(alloc._virtual_to_physical) == 0, "All mappings cleared" + assert len(alloc._available_slots) == 40, "All slots available" + + +def test_remove_client_keeps_shared_slots(): + alloc = ChannelSlotAllocator(max_slots=40) + addr_a = ("127.0.0.1", 9000) + addr_b = ("127.0.0.1", 9001) + + payload = _make_set_channel_payload(0, "Weather") + alloc.process_outgoing(addr_a, payload) + alloc.process_outgoing(addr_b, payload) + + alloc.remove_client(addr_a) + + assert len(alloc._physical_slots) == 1 + assert (addr_b, 0) in alloc._virtual_to_physical + + +# --- Task 5: GET_CHANNEL and Pass-Through --- + + +def _make_get_channel_payload(channel_idx: int) -> bytes: + """Build a GET_CHANNEL (0x1F) payload.""" + return b"\x1f" + channel_idx.to_bytes(1, "little") + + +def test_get_channel_rewrites_mapped_index(): + alloc = ChannelSlotAllocator(max_slots=40) + addr = ("127.0.0.1", 9000) + + set_payload = _make_set_channel_payload(7, "Weather") + result = alloc.process_outgoing(addr, set_payload) + physical_idx = result[1] + + get_payload = _make_get_channel_payload(7) + result = alloc.process_outgoing(addr, get_payload) + + assert result[0] == 0x1F, "Command type preserved" + assert result[1] == physical_idx, "Index rewritten to physical" + + +def test_get_channel_unmapped_passes_through(): + alloc = ChannelSlotAllocator(max_slots=40) + addr = ("127.0.0.1", 9000) + + get_payload = _make_get_channel_payload(3) + result = alloc.process_outgoing(addr, get_payload) + + assert result == get_payload, "Unmapped GET_CHANNEL passes through" + + +def test_non_channel_command_passes_through(): + alloc = ChannelSlotAllocator(max_slots=40) + addr = ("127.0.0.1", 9000) + + payload = b"\x01" + result = alloc.process_outgoing(addr, payload) + + assert result == payload, "Non-channel command unchanged" + + +def test_empty_payload_passes_through(): + alloc = ChannelSlotAllocator(max_slots=40) + addr = ("127.0.0.1", 9000) + + result = alloc.process_outgoing(addr, b"") + assert result == b"" + + result = alloc.process_incoming(addr, b"") + assert result == b"" + + +# --- Task 8: End-to-End Multi-Client Scenario Tests --- + + +def test_full_scenario_from_spec(): + """ + End-to-end test matching the example scenario in the design spec: + Two clients, dedup, reassignment, disconnect. + """ + alloc = ChannelSlotAllocator(max_slots=40) + addr_a = ("127.0.0.1", 9000) + addr_b = ("127.0.0.1", 9001) + + from hashlib import sha256 + secret_w = sha256("Weather".encode()).digest()[:16] + secret_n = sha256("News".encode()).digest()[:16] + + # 1. Client A: SET_CHANNEL(virtual=0, "Weather") + result = alloc.process_outgoing(addr_a, _make_set_channel_payload(0, "Weather")) + assert result is not None + phys_weather = result[1] + + # 2. Client B: SET_CHANNEL(virtual=0, "Weather") - dedup + result = alloc.process_outgoing(addr_b, _make_set_channel_payload(0, "Weather")) + assert result is None, "Dedup hit" + assert alloc._virtual_to_physical[(addr_b, 0)] == phys_weather + + # 3. Client A: SEND_CHAN_MSG(chan=0, "storm warning") + msg_a = _make_send_chan_msg_payload(0, "storm warning") + result = alloc.process_outgoing(addr_a, msg_a) + assert result[2] == phys_weather + + # 4. Client B: SET_CHANNEL(virtual=0, "News") - reassignment + result = alloc.process_outgoing(addr_b, _make_set_channel_payload(0, "News")) + assert result is not None + phys_news = result[1] + assert phys_news != phys_weather + # Weather slot still alive (Client A) + assert phys_weather in alloc._physical_slots + + # 5. Client B: SEND_CHAN_MSG(chan=0, "headlines") + msg_b = _make_send_chan_msg_payload(0, "headlines") + result = alloc.process_outgoing(addr_b, msg_b) + assert result[2] == phys_news + + # 6. Client A disconnects + alloc.remove_client(addr_a) + assert phys_weather not in alloc._physical_slots, "Weather slot released" + assert phys_news in alloc._physical_slots, "News slot survives" + + # 7. Client B disconnects + alloc.remove_client(addr_b) + assert len(alloc._physical_slots) == 0 + assert len(alloc._available_slots) == 40 + + +def test_remote_terminal_cycling_pattern(): + """ + Test the Remote-Terminal pattern: one client cycling through many channels + on a single virtual slot. Should only consume one physical slot at a time. + """ + alloc = ChannelSlotAllocator(max_slots=40) + addr = ("127.0.0.1", 9000) + + for i in range(100): + name = f"Channel{i}" + payload = _make_set_channel_payload(0, name) + result = alloc.process_outgoing(addr, payload) + assert result is not None, f"Channel {i} should allocate" + + # Only 1 physical slot should be in use at any time + assert len(alloc._physical_slots) == 1, ( + f"Iteration {i}: expected 1 physical slot, got {len(alloc._physical_slots)}" + ) + + # Send a message on it + msg = _make_send_chan_msg_payload(0, f"msg{i}") + result = alloc.process_outgoing(addr, msg) + assert result is not None diff --git a/tests/test_proxy.py b/tests/test_proxy.py index 2287d80..283a86c 100644 --- a/tests/test_proxy.py +++ b/tests/test_proxy.py @@ -2,7 +2,7 @@ from unittest.mock import patch import pytest -from meshcore_proxy.proxy import EventLogLevel, MeshCoreProxy +from meshcore_proxy.proxy import EventLogLevel, MeshCoreProxy, TCPClient class MockRadio: @@ -132,3 +132,284 @@ async def test_backoff_delay(mock_serial_connection): await proxy_task except asyncio.CancelledError: pass + + +@pytest.mark.asyncio +@patch("meshcore_proxy.proxy.SerialConnection") +async def test_commands_serialized_through_queue(mock_serial_connection): + """ + Tests that commands from multiple clients are serialized through the queue + and arrive at the radio one at a time in FIFO order. + """ + mock_radio = MockRadio(connect_fails=0) + mock_serial_connection.return_value = mock_radio + + proxy = MeshCoreProxy( + serial_port="/dev/ttyUSB0", + event_log_level=EventLogLevel.OFF, + tcp_port=5010, + ) + + proxy_task = asyncio.create_task(proxy.run()) + await asyncio.sleep(1) + assert proxy._radio_connected + + # Enqueue three commands + payload_a = b"\x01" # CMD_APPSTART + payload_b = b"\x14" # CMD_GET_BATTERY + payload_c = b"\x05" # CMD_GET_TIME + + await proxy._command_queue.put(payload_a) + await proxy._command_queue.put(payload_b) + await proxy._command_queue.put(payload_c) + + # Give the worker time to process + await asyncio.sleep(0.5) + + assert mock_radio.send_buffer == [payload_a, payload_b, payload_c] + + proxy_task.cancel() + try: + await proxy_task + except asyncio.CancelledError: + pass + + +@pytest.mark.asyncio +@patch("meshcore_proxy.proxy.SerialConnection") +async def test_tcp_client_commands_go_through_queue(mock_serial_connection): + """ + Tests that commands received from a TCP client are routed through the + command queue rather than sent directly to the radio. + """ + mock_radio = MockRadio(connect_fails=0) + mock_serial_connection.return_value = mock_radio + + proxy = MeshCoreProxy( + serial_port="/dev/ttyUSB0", + event_log_level=EventLogLevel.OFF, + tcp_port=5011, + ) + + proxy_task = asyncio.create_task(proxy.run()) + await asyncio.sleep(1) + assert proxy._radio_connected + + # Connect a TCP client and send a framed command + reader, writer = await asyncio.open_connection("127.0.0.1", 5011) + + # Send a framed CMD_APPSTART: 0x3c + 2-byte size (1, little-endian) + payload + payload = b"\x01" + frame = b"\x3c" + len(payload).to_bytes(2, byteorder="little") + payload + writer.write(frame) + await writer.drain() + + # Give time for processing + await asyncio.sleep(0.5) + + assert mock_radio.send_buffer == [payload] + + writer.close() + await writer.wait_closed() + + proxy_task.cancel() + try: + await proxy_task + except asyncio.CancelledError: + pass + + +@pytest.mark.asyncio +@patch("meshcore_proxy.proxy.SerialConnection") +async def test_commands_dropped_when_radio_disconnected(mock_serial_connection): + """ + Tests that commands enqueued while the radio is disconnected are dropped + with a warning rather than causing errors. + """ + mock_radio = MockRadio(connect_fails=0) + mock_serial_connection.return_value = mock_radio + + proxy = MeshCoreProxy( + serial_port="/dev/ttyUSB0", + event_log_level=EventLogLevel.OFF, + tcp_port=5012, + ) + + proxy_task = asyncio.create_task(proxy.run()) + await asyncio.sleep(1) + assert proxy._radio_connected + + # Disconnect the radio and prevent reconnection + mock_radio.connect_fails = 999 + await mock_radio.disconnect() + assert not proxy._radio_connected + + # Record send count before enqueuing + send_count_before = len(mock_radio.send_buffer) + + # Enqueue a command while disconnected + await proxy._command_queue.put(b"\x01") + + # Give the worker time to process + await asyncio.sleep(0.5) + + # Command should not appear in send buffer (it was dropped) + assert len(mock_radio.send_buffer) == send_count_before + + proxy_task.cancel() + try: + await proxy_task + except asyncio.CancelledError: + pass + + +def test_frame_payload_uses_server_direction_byte(): + """ + Tests that _frame_payload uses 0x3E (server -> client direction byte) + per the MeshCore TCP framing protocol. + """ + proxy = MeshCoreProxy(serial_port="/dev/ttyUSB0") + payload = b"\x05\x01\x02" + framed = proxy._frame_payload(payload) + + assert framed[0:1] == b"\x3e", "Direction byte should be 0x3E (server -> client)" + assert framed[1:3] == len(payload).to_bytes(2, byteorder="little"), "Size should be little-endian" + assert framed[3:] == payload, "Payload should follow header unchanged" + + +def test_parse_tcp_frame_accepts_valid_header(): + """ + Tests that _parse_tcp_frame accepts frames with 0x3C direction byte. + """ + proxy = MeshCoreProxy(serial_port="/dev/ttyUSB0") + client = TCPClient(reader=None, writer=None, addr=("127.0.0.1", 9999)) + + payload = b"\x01\x02\x03" + frame = b"\x3c" + len(payload).to_bytes(2, byteorder="little") + payload + + result = proxy._parse_tcp_frame(client, frame) + assert result == [payload] + + +def test_parse_tcp_frame_rejects_invalid_header(): + """ + Tests that _parse_tcp_frame discards frames with wrong direction byte. + """ + proxy = MeshCoreProxy(serial_port="/dev/ttyUSB0") + client = TCPClient(reader=None, writer=None, addr=("127.0.0.1", 9999)) + + payload = b"\x01\x02\x03" + # Use 0x3E (server -> client) instead of 0x3C (client -> server) + bad_frame = b"\x3e" + len(payload).to_bytes(2, byteorder="little") + payload + + result = proxy._parse_tcp_frame(client, bad_frame) + assert result == [], "Frame with wrong direction byte should be discarded" + + +@pytest.mark.asyncio +@patch("meshcore_proxy.proxy.SerialConnection") +async def test_virtualized_channels_rewrite_set_channel(mock_serial_connection): + """ + Tests that with virtualize_channels enabled, SET_CHANNEL commands get + rewritten with physical slot indices. + """ + mock_radio = MockRadio(connect_fails=0) + mock_serial_connection.return_value = mock_radio + + proxy = MeshCoreProxy( + serial_port="/dev/ttyUSB0", + event_log_level=EventLogLevel.OFF, + tcp_port=5020, + virtualize_channels=True, + ) + + proxy_task = asyncio.create_task(proxy.run()) + await asyncio.sleep(1) + assert proxy._radio_connected + + # Connect TCP client and send SET_CHANNEL + reader, writer = await asyncio.open_connection("127.0.0.1", 5020) + + # Build SET_CHANNEL(virtual_idx=5, "TestChannel") + from hashlib import sha256 + name = "TestChannel" + name_bytes = name.encode("utf-8")[:32].ljust(32, b"\x00") + secret = sha256(name.encode("utf-8")).digest()[:16] + payload = b"\x20\x05" + name_bytes + secret + frame = b"\x3c" + len(payload).to_bytes(2, byteorder="little") + payload + + writer.write(frame) + await writer.drain() + await asyncio.sleep(0.5) + + # Radio should receive SET_CHANNEL with a physical index (not necessarily 5) + assert len(mock_radio.send_buffer) == 1 + sent = mock_radio.send_buffer[0] + assert sent[0] == 0x20, "Command type preserved" + assert sent[2:] == payload[2:], "Config bytes preserved" + physical_idx = sent[1] + assert 0 <= physical_idx < 40 + + writer.close() + await writer.wait_closed() + + proxy_task.cancel() + try: + await proxy_task + except asyncio.CancelledError: + pass + + +@pytest.mark.asyncio +@patch("meshcore_proxy.proxy.SerialConnection") +async def test_virtualized_channels_dedup_suppresses_radio_command(mock_serial_connection): + """ + Tests that duplicate SET_CHANNEL from a second client is suppressed + (not sent to radio). + """ + mock_radio = MockRadio(connect_fails=0) + mock_serial_connection.return_value = mock_radio + + proxy = MeshCoreProxy( + serial_port="/dev/ttyUSB0", + event_log_level=EventLogLevel.OFF, + tcp_port=5021, + virtualize_channels=True, + ) + + proxy_task = asyncio.create_task(proxy.run()) + await asyncio.sleep(1) + assert proxy._radio_connected + + # Build the same SET_CHANNEL payload for both clients + from hashlib import sha256 + name = "SharedChannel" + name_bytes = name.encode("utf-8")[:32].ljust(32, b"\x00") + secret = sha256(name.encode("utf-8")).digest()[:16] + payload = b"\x20\x00" + name_bytes + secret + frame = b"\x3c" + len(payload).to_bytes(2, byteorder="little") + payload + + # Client A sends SET_CHANNEL + reader_a, writer_a = await asyncio.open_connection("127.0.0.1", 5021) + writer_a.write(frame) + await writer_a.drain() + await asyncio.sleep(0.5) + assert len(mock_radio.send_buffer) == 1, "First SET_CHANNEL sent to radio" + + # Client B sends identical SET_CHANNEL + reader_b, writer_b = await asyncio.open_connection("127.0.0.1", 5021) + writer_b.write(frame) + await writer_b.drain() + await asyncio.sleep(0.5) + assert len(mock_radio.send_buffer) == 1, "Dedup: second SET_CHANNEL NOT sent to radio" + + writer_a.close() + writer_b.close() + await writer_a.wait_closed() + await writer_b.wait_closed() + + proxy_task.cancel() + try: + await proxy_task + except asyncio.CancelledError: + pass