feat: FIFO command queue + channel slot virtualization for multi-client access#13
feat: FIFO command queue + channel slot virtualization for multi-client access#13
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a FIFO asyncio.Queue–backed command pipeline so multiple TCP clients can safely share a single radio connection without interleaving outbound commands.
Changes:
- Introduces a background queue worker to serialize outbound commands to the radio.
- Updates TCP client handling to enqueue commands instead of sending directly.
- Adds async tests covering FIFO ordering, TCP → queue → radio flow, and disconnected-radio behavior.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
src/meshcore_proxy/proxy.py |
Adds command queue + worker, routes TCP client commands through the queue, and cancels/drains the queue on shutdown. |
tests/test_proxy.py |
Adds tests for queue serialization and disconnected-radio handling. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| await self._send_to_radio(payload) | ||
| except Exception as e: | ||
| logger.error(f"Queue worker error: {e}") | ||
| finally: |
There was a problem hiding this comment.
_run_command_queue catches Exception, which includes asyncio.CancelledError. If the worker is cancelled while awaiting _send_to_radio, the cancellation will be swallowed and the task will keep running, potentially causing stop() to hang while awaiting _queue_worker_task. Handle asyncio.CancelledError explicitly (re-raise) before the broad exception handler so cancellation reliably terminates the worker.
src/meshcore_proxy/proxy.py
Outdated
| self._command_queue: asyncio.Queue = asyncio.Queue() | ||
| self._queue_worker_task: Optional[asyncio.Task] = None |
There was a problem hiding this comment.
_command_queue is unbounded. Since TCP handlers now put() quickly without waiting on radio I/O, a fast/malicious client can enqueue commands faster than the single worker can drain, leading to unbounded memory growth. Consider giving the queue a maxsize and applying backpressure (awaiting put) or dropping commands with a clear warning when the queue is full.
| # Command should not appear in send buffer (it was dropped) | ||
| # The radio had no sends after disconnection | ||
| send_count_before = len(mock_radio.send_buffer) | ||
| assert len(mock_radio.send_buffer) == send_count_before |
There was a problem hiding this comment.
This assertion is a no-op: send_count_before is captured after the enqueue + wait, and then compared to the current length, so it will always pass. Capture send_count_before before enqueuing (or assert mock_radio.send_buffer is unchanged/empty) so the test actually verifies that the command was dropped while disconnected.
Add asyncio.Queue and worker task to MeshCoreProxy so that commands are serialized through the queue and sent to the radio one at a time in FIFO order. The worker is started in run() and cancelled in stop(). Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
Adds a publish-ghcr-dev job that builds and pushes ghcr.io/rgregg/meshcore-proxy:dev-<PR number> on pull requests. Requires approval via the dev-deploy environment. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
- Re-raise CancelledError in queue worker so shutdown isn't blocked - Add maxsize=100 to command queue to bound memory usage - Fix no-op assertion in disconnect test (capture count before enqueue) - Prevent reconnection in disconnect test to avoid race condition Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
0543ae5 to
44253bb
Compare
Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
Reject frames that don't start with 0x3C (client -> server) instead of blindly parsing them. Logs a warning with the invalid byte value. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
Cover SEND_CHAN_MSG rewriting, reassignment, LRU eviction, response rewriting, client disconnect, GET_CHANNEL, and pass-through. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
Code review caught that the V3 variant of channel message responses wasn't being rewritten. Consolidated the response type check into a single tuple membership test. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
All CLI flags now accept environment variable fallbacks, making Docker container configuration easier without modifying the command. Environment variables: SERIAL_PORT, BLE_ADDRESS, TCP_HOST, TCP_PORT, BAUD_RATE, BLE_PIN, LOG_LEVEL (off/summary/verbose), LOG_JSON, DEBUG, VIRTUALIZE_CHANNELS Updated docker-compose.yml to use environment variables instead of command arrays. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
Skip the dev-deploy environment approval gate for PRs from branches in this repo (trusted). PRs from forks still require manual approval. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
The Dockerfile CMD was --help, which caused containers configured purely via environment variables to loop (print help, exit, restart). Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
Replace --quiet, --log-events, --log-events-verbose, and --debug with a single --log-level flag: off, error, warning, info, debug, verbose. Simplifies Docker configuration to just LOG_LEVEL=debug. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
Summary
asyncio.Queueto serialize commands from multiple TCP clients to the radio--virtualize-channels/VIRTUALIZE_CHANNELS=true) that gives each TCP client its own virtual channel slot space mapped to physical radio slotsMotivation
Support running Home Assistant integration, MeshCore client, and Remote-Terminal simultaneously against the same radio. The command queue prevents command interleaving, and channel virtualization prevents clients from overwriting each other's channel slot configurations.
New files
src/meshcore_proxy/channel_virtualizer.py-ChannelSlotAllocatorwith all virtualization logictests/test_channel_virtualizer.py- 20 unit testsEnvironment variables
All CLI flags now accept env var fallbacks:
SERIAL_PORT,BLE_ADDRESS,TCP_HOST,TCP_PORT,BAUD_RATE,BLE_PIN,LOG_LEVEL,LOG_JSON,DEBUG,VIRTUALIZE_CHANNELSTest plan
test_commands_serialized_through_queue- verifies FIFO ordering through queuetest_tcp_client_commands_go_through_queue- verifies end-to-end TCP client -> queue -> radio pathtest_commands_dropped_when_radio_disconnected- verifies graceful handling when radio is downghcr.io/rgregg/meshcore-proxy:dev-13and test withVIRTUALIZE_CHANNELS=true--virtualize-channels🤖 Generated with Claude Code