-
Notifications
You must be signed in to change notification settings - Fork 38
Fix/38363 bridge stall #1325
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix/38363 bridge stall #1325
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR fixes a critical bridge stall issue where MQTT bridges would fail to resume message forwarding after connection failures. The root cause involved inflight message markers not being properly cleared during reconnections, causing messages to be permanently stuck in persistence queues.
Key changes:
- Introduces separate handling for initial connections vs reconnections to avoid duplicate message delivery
- Adds
removeAllInFlightMarkersmethod to persistence layer to reset stuck messages - Implements new callbacks (
ResetAllInflightMarkersCallback,OnReconnectCallback) to coordinate reconnection state - Refactors
drainQueue()to properly clear all stale state and reset inflight markers
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
legacy-modbus-adapter-minimal-config.xml |
Removed unused test resource file |
legacy-modbus-adapter-full-config.xml |
Removed unused test resource file |
ClientQueueMemoryLocalPersistence.java |
Implements new method to remove all inflight markers for a queue |
ClientQueuePersistenceImpl.java |
Adds persistence layer method to reset all inflight markers and notify waiting consumers |
ClientQueuePersistence.java |
Extends interface with removeAllInFlightMarkers method |
ClientQueueLocalPersistence.java |
Extends interface with removeAllInFlightMarkers method |
RemoteMqttForwarder.java |
Major refactoring to handle reconnection scenarios, distinguish initial connection from reconnection, and properly reset state |
BridgeMqttClient.java |
Adds logic to distinguish initial connection from reconnection using everConnected flag |
MqttForwarder.java |
Extends interface with new methods for flushing buffered messages and handling reconnection |
MessageForwarderImpl.java |
Implements callbacks to reset inflight markers and trigger message polling after reconnection |
FINDINGS_BRIDGE.md |
Documents root cause analysis of the bridge stall issue |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
hivemq-edge/src/main/java/com/hivemq/bridge/mqtt/RemoteMqttForwarder.java
Outdated
Show resolved
Hide resolved
hivemq-edge/src/main/java/com/hivemq/bridge/mqtt/RemoteMqttForwarder.java
Outdated
Show resolved
Hide resolved
hivemq-edge/src/main/java/com/hivemq/bridge/mqtt/RemoteMqttForwarder.java
Show resolved
Hide resolved
Coverage Report
|
caoccao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hold one concern.
hivemq-edge/src/main/java/com/hivemq/bridge/mqtt/RemoteMqttForwarder.java
Show resolved
Hide resolved
| try { | ||
| // Wait for all inflight markers to be reset before returning | ||
| // This ensures onReconnect() will see clean queues when it triggers checkBuffers() | ||
| Futures.allAsList(futuresBuilder.build()).get(30, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the 30 secs could be configurable, or a constant.
Also, on timeout, i.e. when the markers are not removed, we still do onReconnect but the markets would not be reset, they would be stuck. Might this be a potential issue?, should the wait be longer? should the exception be propagated as a RT and prevent the reconnect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Turned it into a constant.
Not really want to make it configurable.
The value is deliberately high. Normally the operation should finish in <100ms. If we ever reach the timeout the IO-layer is in trouble. And that's where the reconnect logic I added kicks in.
| } | ||
| final ImmutableList.Builder<ListenableFuture<Void>> futuresBuilder = ImmutableList.builder(); | ||
| for (final String queueIdToReset : forwarderQueueIds) { | ||
| futuresBuilder.add(queuePersistence.get().removeAllInFlightMarkers(queueIdToReset)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if removeAllInFlightMarkers() is interrupted by Futures.allAsList(futuresBuilder.build()).get(30, TimeUnit.SECONDS)? Is there a rollback logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally the operation should finish in <100ms. If we ever reach the timeout the IO-layer is in trouble. And that's where the reconnect logic I added in the last commit kicks in.
|
Better rebase to latest master. This branch has too much noise now. |
|
… connection problems
…warder.java Co-authored-by: Copilot <[email protected]>
1. forceReconnect() disconnects the MQTT client 2. BridgeMqttClient's addDisconnectedListener() detects the disconnect 3. Auto-reconnect kicks in with exponential backoff (1s min, 2min max) 4. On reconnect, addConnectedListener() fires 5. This calls drainQueue() → resetAllInflightMarkersCallback again 6. If persistence is now responsive, markers are cleared and messages flow
6983fa9 to
672d008
Compare
caoccao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM



Motivation
Resolves #38363
Changes
Under certai nconditions a bridge would not resume after losing its connection.