diff --git a/FINDINGS_BRIDGE.md b/FINDINGS_BRIDGE.md new file mode 100644 index 0000000000..4785c0a71f --- /dev/null +++ b/FINDINGS_BRIDGE.md @@ -0,0 +1,181 @@ +# Bridge Message Forwarding Stall - Root Cause Analysis + +## Summary + +The `RemoteMqttForwarder` can enter a permanent stall state after connection failures where messages continuously arrive on the source topic but are never forwarded, even after the bridge reports being connected. + +## Symptoms + +- Bridge logs show successful reconnection ("Bridge connected") +- No more message forwarding occurs after reconnection +- No warnings or errors are logged - complete silence +- New messages keep arriving but are not processed + +## Reproduction Scenario + +The issue occurs during rapid connect/disconnect cycles, typically caused by: +- Network instability +- Remote broker restarts +- TCP connection issues (broken pipe, channel output shutdown) + +Example log pattern: +``` +Bridge disconnected: Server closed connection without DISCONNECT +Bridge connected +WARN Unable to forward message... ChannelOutputShutdownException +Bridge disconnected +Bridge connected +WARN Unable to forward message... +Bridge disconnected +Bridge connected <-- Final reconnect, then SILENCE +``` + +## Root Cause + +The stall is caused by a combination of two bugs in the message forwarding pipeline: + +### Bug 1: Inflight Counter Imbalance in `drainQueue()` + +**Location:** `RemoteMqttForwarder.java:320-360` + +When `drainQueue()` sends buffered messages, it does NOT increment `inflightCounter`, but when those messages complete (success or failure), `finishProcessing()` DOES decrement it. + +```java +// drainQueue() - NO inflightCounter.incrementAndGet() here! +publishResult.whenComplete((result, throwable) -> { + // ... + finishProcessing(...); // This decrements inflightCounter! +}); +``` + +Compare with `onMessage()` which correctly increments before processing: +```java +public void onMessage(...) { + inflightCounter.incrementAndGet(); // Correct! + // ... eventually calls finishProcessing() which decrements +} +``` + +This causes `inflightCounter` to go negative after draining buffered messages. + +### Bug 2: Inflight Markers Not Cleared on Publish Failure + +**Location:** `RemoteMqttForwarder.java:303-316` and persistence layer + +When messages are read from the persistence queue via `readShared()`, they get an **inflight marker** set (packet ID assigned). This marker indicates the message is being processed. + +When a publish fails: +1. `handlePublishError()` logs the warning +2. `finishProcessing()` is called +3. `afterForwardCallback.afterMessage()` → `messageProcessed()` → `removeShared()` + +The problem: `removeShared()` tries to **remove** the message from persistence, but if the publish failed, the message should be **retried**, not removed. The inflight marker should be cleared so the message can be re-read. + +**Location of inflight marker skip:** `ClientQueueMemoryLocalPersistence.java:307-309` +```java +if (publishWithRetained.getPacketIdentifier() != NO_PACKET_ID) { + //already inflight + continue; // Message is SKIPPED! +} +``` + +### Bug 3: Missing `checkBuffers()` Trigger After Reconnection + +**Location:** `BridgeMqttClient.java:393-412` + +The connected listener calls `drainQueue()` but does NOT trigger `checkBuffers()`: +```java +builder.addConnectedListener(context -> { + connected.set(true); + forwarders.forEach(MqttForwarder::drainQueue); // Drains in-memory buffer + // Missing: No trigger to poll from persistence queue! +}); +``` + +If the in-memory buffer is empty (already drained in previous failed attempts), nothing triggers polling of new messages from persistence. + +## The Deadlock Sequence + +1. Connection drops during message forwarding +2. In-flight publishes fail with `ChannelOutputShutdownException` +3. Messages in persistence queue retain their inflight markers +4. Rapid reconnect/disconnect cycles occur +5. Each `drainQueue()` call decrements `inflightCounter` without incrementing +6. After final successful reconnect: + - In-memory buffer (`queue`) is empty + - `drainQueue()` returns immediately without triggering `checkBuffers()` + - Persistence queue has messages but they all have inflight markers + - `readShared()` skips all messages (inflight markers set) + - Returns null/empty → `notEmptyQueues.remove(queueId)` + - No mechanism triggers `checkBuffers()` again + - **Permanent stall** + +## Affected Files + +- `hivemq-edge/src/main/java/com/hivemq/bridge/mqtt/RemoteMqttForwarder.java` +- `hivemq-edge/src/main/java/com/hivemq/bridge/mqtt/BridgeMqttClient.java` +- `hivemq-edge/src/main/java/com/hivemq/bridge/MessageForwarderImpl.java` +- `hivemq-edge/src/main/java/com/hivemq/persistence/local/memory/ClientQueueMemoryLocalPersistence.java` + +## Recommended Fixes + +### Fix 1: Balance `inflightCounter` in `drainQueue()` + +Add `inflightCounter.incrementAndGet()` when sending messages in `drainQueue()`: + +```java +// In drainQueue(), before sending each buffered message: +inflightCounter.incrementAndGet(); +final CompletableFuture publishResult = + remoteMqttClient.getMqtt5Client().publish(convertPublishForClient(buffered.publish)); +``` + +### Fix 2: Clear Inflight Markers on Publish Failure + +When a publish fails, call `removeInFlightMarker()` instead of (or in addition to) `removeShared()`: + +```java +// In the publish failure path: +if (throwable != null) { + handlePublishError(publish, throwable); + // Clear inflight marker so message can be retried + resetInflightMarkerCallback.afterMessage(queueId, uniqueId); +} +``` + +### Fix 3: Trigger `checkBuffers()` After Reconnection + +In the connected listener, after draining queues, trigger a buffer check: + +```java +builder.addConnectedListener(context -> { + connected.set(true); + forwarders.forEach(MqttForwarder::drainQueue); + // Trigger polling from persistence queue + messageForwarder.checkBuffers(); // Need to pass reference or use event +}); +``` + +### Fix 4: Consider Adding Connection State Check Before Publish + +In `sendPublishToRemote()`, re-check connection state after acquiring the lock: + +```java +private synchronized void sendPublishToRemote(...) { + if (!remoteMqttClient.isConnected()) { + queue.add(new BufferedPublishInformation(...)); + // Also need to handle inflightCounter here! + finishProcessing(originalQoS, originalUniqueId, queueId); // Or similar + return; + } + // ... +} +``` + +## Testing Recommendations + +1. Use Toxiproxy or similar to simulate network failures during message forwarding +2. Test rapid connect/disconnect cycles (multiple within 1-2 seconds) +3. Verify message flow resumes after network stabilizes +4. Monitor `inflightCounter` values (add metrics/logging) +5. Verify inflight markers are properly cleared after failures \ No newline at end of file diff --git a/hivemq-edge/src/main/java/com/hivemq/bridge/BRIDGE_DATA_FLOW.md b/hivemq-edge/src/main/java/com/hivemq/bridge/BRIDGE_DATA_FLOW.md new file mode 100644 index 0000000000..143881a55d --- /dev/null +++ b/hivemq-edge/src/main/java/com/hivemq/bridge/BRIDGE_DATA_FLOW.md @@ -0,0 +1,301 @@ + +## Bridge Architecture Overview + +### Core Components + +``` +┌─────────────────────────────────────────────────────────────────────────┐ +│ HiveMQ Edge │ +│┌───────────────────────────────────────────────────────────────────────┐│ +││ BridgeService ││ +││ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────────┐ ││ +││ │ BridgeMqttClient│ │ MessageForwarder │ │ RemotePublishConsumer│ ││ +││ │ (to Remote) │ │ (Local→Remote) │ │ (Remote→Local) │ ││ +││ └────────┬─────────┘ └────────┬─────────┘ └──────────┬───────────┘ ││ +││ │ │ │ ││ +││ │ ┌────────────────┴────────────────┐ │ ││ +││ │ │ RemoteMqttForwarder │ │ ││ +││ │ │ - drainQueue() │ │ ││ +││ │ │ - sendPublish() │ │ ││ +││ │ │ - finishProcessing() │ │ ││ +││ │ │ - inflightCounter │ │ ││ +││ │ └────────────────┬────────────────┘ │ ││ +││ │ │ │ ││ +││ ┌────────┴─────────────────────┴───────────────────────┴────────┐ ││ +││ │ Persistence Layer │ ││ +││ │ - In-Memory Queue (default) │ ││ +││ │ - RocksDB Queue (FILE_NATIVE) │ ││ +││ │ - Inflight markers (packetId tracking) │ ││ +││ └───────────────────────────────────────────────────────────────┘ ││ +│└───────────────────────────────────────────────────────────────────────┘│ +└─────────────────────────────────────────────────────────────────────────┘ + │ + │ MQTT (TCP/TLS/WebSocket) + ▼ + ┌──────────────────────┐ + │ Remote Broker │ + └──────────────────────┘ +``` + +### Bridge Configuration Options + +| Option | Type | Description | Default | +|--------|------|-------------|---------| +| `id` | String | Unique bridge identifier | Required | +| `host` | String | Remote broker hostname | Required | +| `port` | int | Remote broker port | Required | +| `clientId` | String | MQTT client ID for bridge | Required | +| `keepAlive` | int | MQTT keep-alive interval (seconds) | 60 | +| `sessionExpiry` | long | Session expiry interval (seconds) | 3600 | +| `cleanStart` | boolean | Start with clean session | false | +| `username` | String | Authentication username | null | +| `password` | String | Authentication password | null | +| `bridgeTls` | BridgeTls | TLS configuration | null | +| `bridgeWebsocketConfig` | BridgeWebsocketConfig | WebSocket configuration | null | +| `localSubscriptions` | List | Topics to forward Local→Remote | [] | +| `remoteSubscriptions` | List | Topics to pull Remote→Local | [] | +| `loopPreventionEnabled` | boolean | Enable hop count tracking | true | +| `loopPreventionHopCount` | int | Maximum hop count | 1 | +| `persist` | boolean | Persist messages to disk | true | + +### LocalSubscription Options + +| Option | Type | Description | Default | +|--------|------|-------------|---------| +| `filters` | List | Topic filters to match | Required | +| `destination` | String | Destination topic pattern | null | +| `excludes` | List | Topic patterns to exclude | [] | +| `customUserProperties` | List | User properties to add | [] | +| `preserveRetain` | boolean | Preserve retain flag | false | +| `maxQoS` | int | Maximum QoS level (0, 1, 2) | 2 | +| `queueLimit` | Long | Per-subscription queue limit | null | + +### RemoteSubscription Options + +| Option | Type | Description | Default | +|--------|------|-------------|---------| +| `filters` | List | Topic filters to subscribe | Required | +| `destination` | String | Local destination topic pattern | null | +| `customUserProperties` | List | User properties to add | [] | +| `preserveRetain` | boolean | Preserve retain flag | false | +| `maxQoS` | int | Maximum QoS level (0, 1, 2) | 2 | + +--- + +## Data Flow Scenarios + +### 1. Local-to-Remote Forwarding (Push) + +```mermaid +sequenceDiagram + participant LP as Local Publisher + participant LE as Local Edge Broker + participant MF as MessageForwarder + participant RMF as RemoteMqttForwarder + participant Q as Queue (Memory/RocksDB) + participant BMC as BridgeMqttClient + participant RB as Remote Broker + + LP->>LE: PUBLISH (topic matches filter) + LE->>MF: onMessage(publish) + MF->>MF: Check loop prevention (hop count) + MF->>MF: Apply topic transformation + MF->>MF: Add custom user properties + + alt Bridge Connected + MF->>RMF: forward(message) + RMF->>RMF: inflightCounter.incrementAndGet() + RMF->>BMC: sendPublish(message) + BMC->>RB: PUBLISH + RB-->>BMC: PUBACK (QoS 1) / PUBREC-PUBREL-PUBCOMP (QoS 2) + BMC-->>RMF: onPublishComplete + RMF->>RMF: finishProcessing() + RMF->>RMF: inflightCounter.decrementAndGet() + else Bridge Disconnected + MF->>Q: store(message) + Note over Q: Message persisted with packetId marker + end +``` + +### 2. Remote-to-Local Subscription (Pull) + +```mermaid +sequenceDiagram + participant RB as Remote Broker + participant BMC as BridgeMqttClient + participant RPC as RemotePublishConsumer + participant BIH as BridgeInterceptorHandler + participant LE as Local Edge Broker + participant LS as Local Subscriber + + Note over BMC,RB: Bridge subscribes on connect + BMC->>RB: SUBSCRIBE (remote filters) + RB-->>BMC: SUBACK + + RB->>BMC: PUBLISH (matching topic) + BMC->>RPC: onPublish(message) + RPC->>RPC: Apply topic transformation + RPC->>RPC: Check loop prevention + RPC->>BIH: intercept(message) + BIH->>LE: publishToLocal(message) + LE->>LS: PUBLISH +``` + +### 3. Reconnection Flow (Normal) + +```mermaid +sequenceDiagram + participant RMF as RemoteMqttForwarder + participant Q as Queue + participant BMC as BridgeMqttClient + participant RB as Remote Broker + + Note over BMC,RB: Connection Lost + BMC-xRB: Connection broken + + Note over RMF: Messages queued during outage + loop New messages arrive + RMF->>Q: store(message) + end + + Note over BMC,RB: Reconnection + BMC->>RB: CONNECT + RB-->>BMC: CONNACK + BMC->>BMC: onConnected callback + + Note over RMF: Drain queued messages + RMF->>RMF: drainQueue() + loop For each queued message + RMF->>Q: readShared() + Q-->>RMF: message (with packetId marker) + RMF->>BMC: sendPublish(message) + BMC->>RB: PUBLISH + RB-->>BMC: PUBACK + RMF->>Q: removeShared(message) + end +``` + +### 4. Reconnection with Network Disruption + +```mermaid +sequenceDiagram + participant P as Publisher + participant RMF as RemoteMqttForwarder + participant Q as RocksDB Queue + participant BMC as BridgeMqttClient + participant NW as Network + participant RB as Remote Broker + + Note over RMF: Normal operation + P->>RMF: onMessage() + RMF->>RMF: inflightCounter++ (now 1) + RMF->>BMC: sendPublish() + + Note over NW: Network disruption + BMC->>NW: PUBLISH packet + NW-xBMC: Connection lost + Note over BMC: ChannelOutputShutdownException + + Note over RMF: Handle failed publish + RMF->>RMF: finishProcessing() + RMF->>RMF: inflightCounter-- (now 0) + RMF->>Q: Clear packetId markers + + Note over BMC,RB: Reconnection + BMC->>RB: CONNECT + RB-->>BMC: CONNACK + + Note over RMF: Drain queued messages + RMF->>RMF: drainQueue() + RMF->>Q: readShared() + Q-->>RMF: message (markers cleared, ready to send) + RMF->>BMC: sendPublish(message) + BMC->>RB: PUBLISH + RB-->>BMC: PUBACK + RMF->>Q: removeShared(message) + + Note over RMF: Bridge resumes forwarding +``` + +### 5. TLS Connection Flow + +```mermaid +sequenceDiagram + participant BMC as BridgeMqttClient + participant TLS as TLS Layer + participant RB as Remote Broker + + BMC->>TLS: Initialize SSLContext + Note over TLS: Load truststore (server cert validation) + Note over TLS: Load keystore (client cert - mutual TLS) + + BMC->>TLS: Connect + TLS->>RB: ClientHello + RB-->>TLS: ServerHello + Certificate + TLS->>TLS: Validate server certificate + + alt Mutual TLS + RB-->>TLS: CertificateRequest + TLS->>RB: Client Certificate + RB->>RB: Validate client certificate + end + + TLS->>RB: Finished + RB-->>TLS: Finished + Note over TLS: TLS Handshake Complete + + BMC->>RB: MQTT CONNECT (encrypted) + RB-->>BMC: CONNACK (encrypted) +``` + +### 6. WebSocket Connection Flow + +```mermaid +sequenceDiagram + participant BMC as BridgeMqttClient + participant WS as WebSocket Layer + participant RB as Remote Broker + + BMC->>WS: Configure WebSocket + Note over WS: path: /mqtt (configurable) + Note over WS: subProtocol: mqtt (configurable) + + WS->>RB: HTTP Upgrade Request + Note over WS: GET /mqtt HTTP/1.1 + Note over WS: Upgrade: websocket + Note over WS: Sec-WebSocket-Protocol: mqtt + + RB-->>WS: HTTP 101 Switching Protocols + Note over WS: WebSocket connection established + + BMC->>RB: MQTT CONNECT (over WebSocket frames) + RB-->>BMC: CONNACK +``` + +### 7. Loop Prevention Flow + +```mermaid +sequenceDiagram + participant E1 as Edge 1 + participant B1 as Bridge 1 + participant E2 as Edge 2 + participant B2 as Bridge 2 + + Note over E1,E2: Bidirectional bridge setup + Note over E1,E2: hopCount = 2 + + E1->>E1: Local publish (hopCount = 0) + E1->>B1: Forward to E2 + B1->>B1: Increment hopCount (now 1) + B1->>E2: PUBLISH with hopCount=1 + + E2->>B2: Forward back to E1? + B2->>B2: Increment hopCount (now 2) + B2->>E1: PUBLISH with hopCount=2 + + E1->>B1: Forward again? + B1->>B1: Check hopCount (2 >= maxHop 2) + Note over B1: BLOCKED - loop prevented! +``` + +--- diff --git a/hivemq-edge/src/main/java/com/hivemq/bridge/MessageForwarderImpl.java b/hivemq-edge/src/main/java/com/hivemq/bridge/MessageForwarderImpl.java index 8e0e086939..21f59d3e3b 100644 --- a/hivemq-edge/src/main/java/com/hivemq/bridge/MessageForwarderImpl.java +++ b/hivemq-edge/src/main/java/com/hivemq/bridge/MessageForwarderImpl.java @@ -50,6 +50,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -62,6 +63,7 @@ public class MessageForwarderImpl implements MessageForwarder { public static final @NotNull String FORWARDER_PREFIX = "forwarder#"; private static final @NotNull Logger log = LoggerFactory.getLogger(MessageForwarderImpl.class); + public static final int RESET_INFLIGHT_COUNTERS_TIMEOUT_IN_SECONDS = 30; private final @NotNull LocalTopicTree topicTree; private final @NotNull HivemqId hivemqId; @@ -160,8 +162,11 @@ public void addForwarder(final @NotNull MqttForwarder mqttForwarder) { forwarderId, queueId)); mqttForwarder.setResetInflightMarkerCallback((sharedSubscriptionId, uniqueId) -> { + final var qPersistence = queuePersistence.get(); try { - queuePersistence.get().removeInFlightMarker(sharedSubscriptionId, uniqueId).get(); + if(qPersistence != null) { + qPersistence.removeInFlightMarker(sharedSubscriptionId, uniqueId).get(); + } } catch (final InterruptedException | ExecutionException e) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); @@ -171,6 +176,61 @@ public void addForwarder(final @NotNull MqttForwarder mqttForwarder) { throw new RuntimeException(e); } }); + mqttForwarder.setResetAllInflightMarkersCallback((fwdId) -> { + // Reset ALL inflight markers for all queues associated with this forwarder. + // This is called on reconnection to handle messages that were read from persistence + // but never made it to the forwarder's local queues. + // + // IMPORTANT: We collect all futures and wait for them using Futures.allAsList to + // ensure all inflight markers are reset before onReconnect triggers checkBuffers(). + // This is safe because the persistence operations are submitted to SingleWriter + // and don't hold any locks that could cause deadlock. + final Set forwarderQueueIds = queueIdsForForwarder.get(fwdId); + if (forwarderQueueIds != null && !forwarderQueueIds.isEmpty()) { + if (log.isDebugEnabled()) { + log.debug("Resetting inflight markers for forwarder '{}', {} queue(s)", + fwdId, forwarderQueueIds.size()); + } + final ImmutableList.Builder> futuresBuilder = ImmutableList.builder(); + final var qPersistence = queuePersistence.get(); + if(qPersistence != null) { + for (final String queueIdToReset : forwarderQueueIds) { + futuresBuilder.add(qPersistence.removeAllInFlightMarkers(queueIdToReset)); + } + } + try { + // Wait for all inflight markers to be reset before returning + // This ensures onReconnect() will see clean queues when it triggers checkBuffers() + Futures.allAsList(futuresBuilder.build()).get(RESET_INFLIGHT_COUNTERS_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS); + if (log.isDebugEnabled()) { + log.debug("Reset all inflight markers for forwarder '{}', {} queue(s)", + fwdId, forwarderQueueIds.size()); + } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("Interrupted while resetting inflight markers for forwarder '{}'", fwdId, e); + } catch (final ExecutionException e) { + log.error("Failed to reset inflight markers for forwarder '{}'", fwdId, e); + } catch (final TimeoutException e) { + log.warn("Timeout resetting inflight markers for forwarder '{}' - forcing reconnect to retry", fwdId, e); + final MqttForwarder forwarder = forwarders.get(fwdId); + if (forwarder != null) { + forwarder.forceReconnect(); + } + } + } + }); + mqttForwarder.setOnReconnectCallback(() -> { + if (log.isDebugEnabled()) { + log.debug("OnReconnect callback triggered for forwarder '{}', checking buffers", forwarderId); + } + // Re-add all queue IDs to notEmptyQueues to ensure they get polled after reconnect + final Set forwarderQueueIds = queueIdsForForwarder.get(forwarderId); + if (forwarderQueueIds != null) { + notEmptyQueues.addAll(forwarderQueueIds); + } + checkBuffers(); + }); forwarders.put(forwarderId, mqttForwarder); queueIdsForForwarder.put(forwarderId, queueIds); @@ -200,9 +260,12 @@ public void removeForwarder(final @NotNull MqttForwarder mqttForwarder, final bo final String queueId = createQueueId(forwarderId, topic); notEmptyQueues.remove(queueId); if (clearQueue) { - queuePersistence.get().clear(queueId, true); //clear up queue - if (log.isTraceEnabled()) { - log.trace("Cleared queue '{}' for forwarder '{}'", queueId, forwarderId); + final var qPersistence = queuePersistence.get(); + if(qPersistence != null) { + qPersistence.clear(queueId, true); //clear up queue + if (log.isTraceEnabled()) { + log.trace("Cleared queue '{}' for forwarder '{}'", queueId, forwarderId); + } } } } @@ -227,7 +290,10 @@ public void messageProcessed( //QoS 0 has no inflight marker if (qos != QoS.AT_MOST_ONCE) { //-- 15665 - > QoS 0 causes republishing - FutureUtils.addExceptionLogger(queuePersistence.get().removeShared(queueId, uniqueId)); + final var qPersistence = queuePersistence.get(); + if(qPersistence != null) { + FutureUtils.addExceptionLogger(qPersistence.removeShared(queueId, uniqueId)); + } } if (log.isTraceEnabled()) { @@ -411,27 +477,31 @@ private ListenableFuture pollForQueue( log.trace("Polling queue '{}' for forwarder '{}', batchSize: {}, byteLimit: {}", queueId, mqttForwarder.getId(), FORWARDER_POLL_THRESHOLD_MESSAGES, PUBLISH_POLL_BATCH_SIZE_BYTES); } - - return Futures.transformAsync(queuePersistence.get() - .readShared(queueId, FORWARDER_POLL_THRESHOLD_MESSAGES, PUBLISH_POLL_BATCH_SIZE_BYTES), publishes -> { - if (publishes == null) { - if (log.isTraceEnabled()) { - log.trace("Queue '{}' is empty, removing from non-empty queues", queueId); + final var qPersistence = queuePersistence.get(); + if(qPersistence != null) { + return Futures.transformAsync(qPersistence + .readShared(queueId, FORWARDER_POLL_THRESHOLD_MESSAGES, PUBLISH_POLL_BATCH_SIZE_BYTES), publishes -> { + if (publishes == null) { + if (log.isTraceEnabled()) { + log.trace("Queue '{}' is empty, removing from non-empty queues", queueId); + } + notEmptyQueues.remove(queueId); + return Futures.immediateFuture(false); } - notEmptyQueues.remove(queueId); - return Futures.immediateFuture(false); - } - final int messageCount = publishes.size(); - if (log.isDebugEnabled()) { - log.debug("Retrieved {} message(s) from queue '{}' for forwarder '{}'", - messageCount, queueId, mqttForwarder.getId()); - } + final int messageCount = publishes.size(); + if (log.isDebugEnabled()) { + log.debug("Retrieved {} message(s) from queue '{}' for forwarder '{}'", + messageCount, queueId, mqttForwarder.getId()); + } - for (final PUBLISH publish : publishes) { - mqttForwarder.onMessage(publish, queueId); - } - return Futures.immediateFuture(!publishes.isEmpty()); - }, executorService); + for (final PUBLISH publish : publishes) { + mqttForwarder.onMessage(publish, queueId); + } + return Futures.immediateFuture(!publishes.isEmpty()); + }, executorService); + } else { + return Futures.immediateFuture(false); + } } } diff --git a/hivemq-edge/src/main/java/com/hivemq/bridge/MqttForwarder.java b/hivemq-edge/src/main/java/com/hivemq/bridge/MqttForwarder.java index e0da598352..1f328bff8d 100644 --- a/hivemq-edge/src/main/java/com/hivemq/bridge/MqttForwarder.java +++ b/hivemq-edge/src/main/java/com/hivemq/bridge/MqttForwarder.java @@ -34,10 +34,39 @@ public interface MqttForwarder { void drainQueue(); + /** + * Sends any messages that were buffered in-memory while the remote broker was disconnected. + * Unlike {@link #drainQueue()}, this does NOT reset inflight markers in persistence - + * it only sends messages that are already in the local queue buffer. + *

+ * This should be called on initial connection when messages may have been buffered + * while waiting for the remote broker to become available. + */ + void flushBufferedMessages(); + void setAfterForwardCallback(@NotNull MqttForwarder.AfterForwardCallback callback); void setResetInflightMarkerCallback(@NotNull MqttForwarder.ResetInflightMarkerCallback callback); + void setResetAllInflightMarkersCallback(@NotNull MqttForwarder.ResetAllInflightMarkersCallback callback); + + void setOnReconnectCallback(@NotNull MqttForwarder.OnReconnectCallback callback); + + /** + * Called after the remote client reconnects. This triggers the reconnect callback + * which should poll from the persistence queue for any messages that need to be retried. + */ + void onReconnect(); + + /** + * Forces a reconnection by disconnecting the underlying MQTT client. + * The auto-reconnect logic will then establish a new connection. + *

+ * This is used to recover from timeout situations where inflight markers + * could not be reset within the expected time. Forcing a reconnection + * triggers a new {@link #drainQueue()} cycle which will retry the marker reset. + */ + void forceReconnect(); void setExecutorService(@NotNull ExecutorService executorService); @@ -54,4 +83,14 @@ interface AfterForwardCallback { interface ResetInflightMarkerCallback { void afterMessage(@NotNull String sharedSubscription, @NotNull String uniqueId); } + + @FunctionalInterface + interface ResetAllInflightMarkersCallback { + void resetAll(@NotNull String sharedSubscription); + } + + @FunctionalInterface + interface OnReconnectCallback { + void onReconnect(); + } } diff --git a/hivemq-edge/src/main/java/com/hivemq/bridge/mqtt/BridgeMqttClient.java b/hivemq-edge/src/main/java/com/hivemq/bridge/mqtt/BridgeMqttClient.java index 63bdfe32c8..67e697fc2b 100644 --- a/hivemq-edge/src/main/java/com/hivemq/bridge/mqtt/BridgeMqttClient.java +++ b/hivemq-edge/src/main/java/com/hivemq/bridge/mqtt/BridgeMqttClient.java @@ -100,6 +100,7 @@ public class BridgeMqttClient { private final @NotNull AtomicReference> startFutureRef; private final @NotNull AtomicReference> stopFutureRef; private final @NotNull AtomicBoolean stopped; + private final @NotNull AtomicBoolean everConnected; private final @NotNull List forwarders; public BridgeMqttClient( @@ -119,6 +120,7 @@ public BridgeMqttClient( this.perBridgeMetrics = new PerBridgeMetrics(bridge.getId(), metricRegistry); this.connected = new AtomicBoolean(); this.stopped = new AtomicBoolean(); + this.everConnected = new AtomicBoolean(); this.forwarders = Collections.synchronizedList(new ArrayList<>()); this.executorService = MoreExecutors.newDirectExecutorService(); this.operationState = new AtomicReference<>(OperationState.IDLE); @@ -175,11 +177,8 @@ public BridgeMqttClient( log.debug("Bridge '{}' MQTT connection established in {} μs", bridge.getId(), connectMicros); } - final int forwarderCount = forwarders.size(); - if (forwarderCount > 0 && log.isDebugEnabled()) { - log.debug("Draining queues for {} forwarder(s) on bridge '{}'", forwarderCount, bridge.getId()); - } - forwarders.forEach(MqttForwarder::drainQueue); + // Note: drainQueue() and onReconnect() are handled by addConnectedListener + // which correctly distinguishes between initial connections and reconnections. final ImmutableList.Builder<@NotNull CompletableFuture> subFutures = new ImmutableList.Builder<>(); @@ -400,13 +399,31 @@ public boolean isConnected() { } log.info("Bridge '{}' connected to {}:{}", bridge.getId(), bridge.getHost(), bridge.getPort()); connected.set(true); + + // Check if this is a reconnection (not the initial connection) + // On initial connection, we only flush buffered messages without resetting persistence state. + // On reconnection, we need to drain all stale state and re-poll from persistence. + final boolean isReconnection = !everConnected.compareAndSet(false, true); + final int forwarderCount = forwarders.size(); if (forwarderCount > 0) { - if (log.isDebugEnabled()) { - log.debug("Draining queues for {} forwarder(s) after reconnection on bridge '{}'", - forwarderCount, bridge.getId()); + if (isReconnection) { + if (log.isDebugEnabled()) { + log.debug("Draining queues for {} forwarder(s) after reconnection on bridge '{}'", + forwarderCount, bridge.getId()); + } + forwarders.forEach(MqttForwarder::drainQueue); + // Trigger checkBuffers() to poll from persistence queue for messages that need retrying + forwarders.forEach(MqttForwarder::onReconnect); + } else { + // Initial connection: send any messages that were buffered while waiting for remote + // but do NOT reset persistence inflight markers (to avoid duplicate delivery on restart) + if (log.isDebugEnabled()) { + log.debug("Flushing buffered messages for {} forwarder(s) on initial connection to bridge '{}'", + forwarderCount, bridge.getId()); + } + forwarders.forEach(MqttForwarder::flushBufferedMessages); } - forwarders.forEach(MqttForwarder::drainQueue); } eventBuilder(Event.SEVERITY.INFO).withMessage("Bridge '" + bridge.getId() + "' connected").fire(); }); diff --git a/hivemq-edge/src/main/java/com/hivemq/bridge/mqtt/RemoteMqttForwarder.java b/hivemq-edge/src/main/java/com/hivemq/bridge/mqtt/RemoteMqttForwarder.java index 646859db9a..c8bcf461f5 100644 --- a/hivemq-edge/src/main/java/com/hivemq/bridge/mqtt/RemoteMqttForwarder.java +++ b/hivemq-edge/src/main/java/com/hivemq/bridge/mqtt/RemoteMqttForwarder.java @@ -69,10 +69,13 @@ public class RemoteMqttForwarder implements MqttForwarder { private final @NotNull BridgeInterceptorHandler bridgeInterceptorHandler; private final AtomicInteger inflightCounter = new AtomicInteger(0); private final AtomicBoolean running = new AtomicBoolean(false); + private final AtomicBoolean draining = new AtomicBoolean(false); private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue outflightQueue = new ConcurrentLinkedQueue<>(); private volatile @Nullable MqttForwarder.AfterForwardCallback afterForwardCallback; private volatile @Nullable ResetInflightMarkerCallback resetInflightMarkerCallback; + private volatile @Nullable ResetAllInflightMarkersCallback resetAllInflightMarkersCallback; + private volatile @Nullable OnReconnectCallback onReconnectCallback; private volatile @Nullable ExecutorService executorService; public RemoteMqttForwarder( @@ -121,6 +124,7 @@ public synchronized void stop() { int clearedQueued = 0; BufferedPublishInformation queueMessage = queue.poll(); while (queueMessage != null) { + final var resetInflightMarkerCallback = this.resetInflightMarkerCallback; if (resetInflightMarkerCallback != null) { resetInflightMarkerCallback.afterMessage(queueMessage.queueId, queueMessage.publish.getUniqueId()); } @@ -165,6 +169,7 @@ public void onMessage(final @NotNull PUBLISH publish, final @NotNull String queu if (log.isTraceEnabled()) { log.trace("Forwarder '{}' not running, dropping message on topic '{}'", id, publish.getTopic()); } + final var resetInflightMarkerCallback = this.resetInflightMarkerCallback; if (resetInflightMarkerCallback != null) { resetInflightMarkerCallback.afterMessage(queueId, publish.getUniqueId()); } @@ -253,11 +258,28 @@ private void finishProcessing( final @NotNull String uniqueId, final @NotNull String queueId) { inflightCounter.decrementAndGet(); + final var afterForwardCallback = this.afterForwardCallback; if (afterForwardCallback != null) { afterForwardCallback.afterMessage(originalQoS, uniqueId, queueId, false); } } + /** + * Called when a publish fails - resets the inflight marker so the message can be retried + * instead of being removed from persistence. This prevents message loss on transient failures. + */ + private void finishProcessingWithRetry( + final @NotNull QoS originalQoS, + final @NotNull String uniqueId, + final @NotNull String queueId) { + inflightCounter.decrementAndGet(); + // Reset inflight marker instead of removing the message, allowing retry + final var resetInflightMarkerCallback = this.resetInflightMarkerCallback; + if (resetInflightMarkerCallback != null) { + resetInflightMarkerCallback.afterMessage(queueId, uniqueId); + } + } + private @NotNull PUBLISH convertPublishAfterBridge(final @NotNull PUBLISH publish, final int hopCount) { final MqttTopic modifiedTopic = convertTopic(localSubscription.getDestination(), publish.getTopic()); final QoS modifiedQoS = convertQos(localSubscription.getMaxQoS(), publish.getQoS()); @@ -285,13 +307,10 @@ private synchronized void sendPublishToRemote( return; } - // first send the publishes that are inflight - final int bufferSize = queue.size(); - if (bufferSize > 0 && log.isDebugEnabled()) { - log.debug("Draining {} buffered message(s) before sending new message for bridge '{}'", - bufferSize, bridge.getId()); - } - drainQueue(); + // First send any buffered messages that accumulated while disconnected. + // Note: This does NOT reset inflight markers - these messages were already + // marked as inflight when they were originally polled from persistence. + sendBufferedMessages(); final long publishStartTime = log.isDebugEnabled() ? System.nanoTime() : 0; final Mqtt5Publish mqtt5Publish = convertPublishForClient(publish); @@ -303,6 +322,9 @@ private synchronized void sendPublishToRemote( publishResult.whenComplete((mqtt5PublishResult, throwable) -> { if (throwable != null) { handlePublishError(publish, throwable); + // On failure, reset the inflight marker so the message can be retried + // instead of being removed from persistence + finishProcessingWithRetry(originalQoS, originalUniqueId, queueId); } else { perBridgeMetrics.getPublishForwardSuccessCounter().inc(); if (log.isDebugEnabled()) { @@ -310,52 +332,134 @@ private synchronized void sendPublishToRemote( log.debug("Successfully published message on topic '{}' to remote broker for bridge '{}' in {} μs", publish.getTopic(), bridge.getId(), durationMicros); } + finishProcessing(originalQoS, originalUniqueId, queueId); } - finishProcessing(originalQoS, originalUniqueId, queueId); outflightQueue.remove(outflightPublishInformation); }); } - @Override - public synchronized void drainQueue() { - final int initialQueueSize = queue.size(); - if (initialQueueSize > 0 && log.isDebugEnabled()) { - log.debug("Draining {} buffered message(s) for bridge '{}'", initialQueueSize, bridge.getId()); + /** + * Sends buffered messages that accumulated while disconnected. + * Does NOT reset inflight markers - these messages were already marked as inflight + * when they were originally polled from persistence. + */ + private synchronized void sendBufferedMessages() { + final int bufferSize = queue.size(); + if (bufferSize > 0 && log.isDebugEnabled()) { + log.debug("Sending {} buffered message(s) for bridge '{}'", bufferSize, bridge.getId()); } - int drainedCount = 0; BufferedPublishInformation buffered = queue.poll(); - while (buffered != null) { - drainedCount++; - if (log.isTraceEnabled()) { - log.trace("Sending buffered message on topic '{}' ({}/{}) for bridge '{}'", - buffered.publish.getTopic(), drainedCount, initialQueueSize, bridge.getId()); - } - + while (buffered != null && remoteMqttClient.isConnected()) { + final BufferedPublishInformation current = buffered; + final long publishStartTime = log.isDebugEnabled() ? System.nanoTime() : 0; + final Mqtt5Publish mqtt5Publish = convertPublishForClient(current.publish); final CompletableFuture publishResult = - remoteMqttClient.getMqtt5Client().publish(convertPublishForClient(buffered.publish)); + remoteMqttClient.getMqtt5Client().publish(mqtt5Publish); final OutflightPublishInformation outflightPublishInformation = - new OutflightPublishInformation(buffered.queueId, buffered.publish.getUniqueId()); + new OutflightPublishInformation(current.queueId, current.uniqueId); outflightQueue.add(outflightPublishInformation); - - // lambdas hate this trick. (we need a final variable for the lamdba) - final BufferedPublishInformation finalBufferedPublishInformation = buffered; publishResult.whenComplete((mqtt5PublishResult, throwable) -> { if (throwable != null) { - handlePublishError(finalBufferedPublishInformation.publish, throwable); + handlePublishError(current.publish, throwable); + finishProcessingWithRetry(current.originalQoS, current.uniqueId, current.queueId); } else { perBridgeMetrics.getPublishForwardSuccessCounter().inc(); + if (log.isDebugEnabled()) { + final long durationMicros = (System.nanoTime() - publishStartTime) / 1000; + log.debug("Successfully published buffered message on topic '{}' to remote broker for bridge '{}' in {} μs", + current.publish.getTopic(), bridge.getId(), durationMicros); + } + finishProcessing(current.originalQoS, current.uniqueId, current.queueId); } - finishProcessing(finalBufferedPublishInformation.originalQqS, - finalBufferedPublishInformation.uniqueId, - finalBufferedPublishInformation.queueId); outflightQueue.remove(outflightPublishInformation); }); buffered = queue.poll(); } + } - if (drainedCount > 0 && log.isDebugEnabled()) { - log.debug("Drained {} buffered message(s) for bridge '{}'", drainedCount, bridge.getId()); + @Override + public void flushBufferedMessages() { + // This method is called on initial connection to send messages that were buffered + // while waiting for the remote broker to become available. + // Unlike drainQueue(), this does NOT reset persistence inflight markers. + if (log.isDebugEnabled()) { + log.debug("Flushing {} buffered message(s) for forwarder '{}' on bridge '{}'", + queue.size(), id, bridge.getId()); + } + sendBufferedMessages(); + } + + @Override + public void drainQueue() { + // Called on reconnection to reset all in-flight state. + // This method is NOT synchronized to avoid deadlock with sendPublishToRemote callbacks. + // Instead, we use a draining flag to coordinate with message processing. + + // Use compareAndSet to ensure only one drain operation runs at a time + if (!draining.compareAndSet(false, true)) { + if (log.isDebugEnabled()) { + log.debug("drainQueue() already in progress for forwarder '{}', skipping", id); + } + return; + } + + try { + if (log.isDebugEnabled()) { + log.debug("drainQueue() entered for forwarder '{}' on bridge '{}'", id, bridge.getId()); + } + + // Clear stale outflight messages from the previous connection FIRST. + // These were sent but their completion callbacks might not have been called. + // Clearing these before resetting inflight markers prevents race conditions. + int clearedOutflight = 0; + OutflightPublishInformation staleOutflight = outflightQueue.poll(); + while (staleOutflight != null) { + clearedOutflight++; + staleOutflight = outflightQueue.poll(); + } + + // Clear the in-memory queue buffer as well. + // These messages were buffered while disconnected and should be retried from persistence + // to maintain proper ordering and avoid duplicates. + int clearedQueued = 0; + BufferedPublishInformation queuedMessage = queue.poll(); + while (queuedMessage != null) { + clearedQueued++; + queuedMessage = queue.poll(); + } + + // Reset the inflight counter since we've just reconnected and there are no + // messages actually in flight anymore. Any pending completion handlers from + // the old connection will decrement harmlessly. + final int previousInflight = inflightCounter.getAndSet(0); + + if (log.isDebugEnabled() && (clearedOutflight > 0 || clearedQueued > 0 || previousInflight > 0)) { + log.debug("Reconnection reset for bridge '{}': cleared {} outflight, {} queued messages, " + + "reset inflightCounter from {} to 0", + bridge.getId(), clearedOutflight, clearedQueued, previousInflight); + } + + // CRITICAL: Reset ALL inflight markers in persistence for all queues this forwarder handles. + // This handles the case where messages were read from persistence (marking them as in-flight) + // but never made it to our local queues (e.g., they were in the interceptor chain when + // the connection dropped). Without this, those messages would remain stuck in persistence + // with inflight markers and never be re-delivered. + // + // This is done AFTER clearing local state to ensure any concurrent message processing + // sees the reset state before new messages arrive from persistence. + final var resetAllInflightMarkersCallback = this.resetAllInflightMarkersCallback; + if (resetAllInflightMarkersCallback != null) { + if (log.isDebugEnabled()) { + log.debug("drainQueue() calling resetAllInflightMarkersCallback for forwarder '{}'", id); + } + resetAllInflightMarkersCallback.resetAll(id); + if (log.isDebugEnabled()) { + log.debug("drainQueue() resetAllInflightMarkersCallback completed for forwarder '{}'", id); + } + } + } finally { + draining.set(false); } } @@ -480,6 +584,42 @@ public void setResetInflightMarkerCallback(final @NotNull ResetInflightMarkerCal resetInflightMarkerCallback = callback; } + @Override + public void setResetAllInflightMarkersCallback(final @NotNull ResetAllInflightMarkersCallback callback) { + resetAllInflightMarkersCallback = callback; + } + + @Override + public void setOnReconnectCallback(final @NotNull OnReconnectCallback callback) { + onReconnectCallback = callback; + } + + @Override + public void onReconnect() { + if (log.isDebugEnabled()) { + log.debug("Forwarder '{}' notified of reconnection for bridge '{}'", id, bridge.getId()); + } + // Trigger the callback to poll from persistence queue for messages that need to be retried + final var onReconnectCallback = this.onReconnectCallback; + if (onReconnectCallback != null) { + onReconnectCallback.onReconnect(); + } + } + + @Override + public void forceReconnect() { + if (remoteMqttClient.isConnected()) { + log.warn("Force reconnect triggered for forwarder '{}' on bridge '{}' - disconnecting to trigger auto-reconnect", + id, bridge.getId()); + remoteMqttClient.getMqtt5Client().disconnect(); + } else { + if (log.isDebugEnabled()) { + log.debug("Force reconnect requested but client already disconnected for forwarder '{}' on bridge '{}'", + id, bridge.getId()); + } + } + } + @Override public @NotNull String getId() { return id; @@ -501,16 +641,16 @@ public void setExecutorService(final @NotNull ExecutorService service) { executorService = service; } - private record BufferedPublishInformation(@NotNull String queueId, String uniqueId, @NotNull QoS originalQqS, + private record BufferedPublishInformation(@NotNull String queueId, String uniqueId, @NotNull QoS originalQoS, @NotNull PUBLISH publish) { private BufferedPublishInformation( final @NotNull String queueId, final @NotNull String uniqueId, - final @NotNull QoS originalQqS, + final @NotNull QoS originalQoS, final @NotNull PUBLISH publish) { this.queueId = queueId; this.uniqueId = uniqueId; - this.originalQqS = originalQqS; + this.originalQoS = originalQoS; this.publish = publish; } } diff --git a/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueueLocalPersistence.java b/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueueLocalPersistence.java index 00ae417c78..7fad369675 100644 --- a/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueueLocalPersistence.java +++ b/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueueLocalPersistence.java @@ -245,4 +245,13 @@ ImmutableList readInflight( * @param bucketIndex provided by the single writer */ void removeInFlightMarker(@NotNull String queueId, @NotNull String uniqueId, int bucketIndex); + + /** + * Remove all in-flight markers for a queue. + * This is called when a bridge reconnects to reset the state and allow messages to be re-delivered. + * + * @param queueId for which all markers should be removed + * @param bucketIndex provided by the single writer + */ + void removeAllInFlightMarkers(@NotNull String queueId, int bucketIndex); } diff --git a/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueuePersistence.java b/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueuePersistence.java index 741cbd5f1e..cffaba0668 100644 --- a/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueuePersistence.java +++ b/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueuePersistence.java @@ -196,6 +196,17 @@ ListenableFuture> readShared( @NotNull ListenableFuture removeInFlightMarker(@NotNull String sharedSubscription, @NotNull String uniqueId); + /** + * Remove all in-flight markers for a shared subscription. + * This is called when a bridge reconnects to reset the state and allow messages to be re-delivered. + *

+ * This method is only used for shared subscription queues. + * + * @param sharedSubscription for which all in-flight markers should be removed + */ + @NotNull + ListenableFuture removeAllInFlightMarkers(@NotNull String sharedSubscription); + /** * Removes all qos 0 messages from a queue * diff --git a/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueuePersistenceImpl.java b/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueuePersistenceImpl.java index 3400fbcf06..8c42fffef0 100644 --- a/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueuePersistenceImpl.java +++ b/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueuePersistenceImpl.java @@ -399,6 +399,17 @@ public ListenableFuture removeInFlightMarker( }); } + @NotNull + @Override + public ListenableFuture removeAllInFlightMarkers(final @NotNull String sharedSubscription) { + return singleWriter.submit(sharedSubscription, (bucketIndex) -> { + localPersistence.removeAllInFlightMarkers(sharedSubscription, bucketIndex); + // We notify the clients that there are new messages to poll. + sharedPublishAvailable(sharedSubscription); + return null; + }); + } + @NotNull @Override public ListenableFuture removeAllQos0Messages(final @NotNull String queueId, final boolean shared) { diff --git a/hivemq-edge/src/main/java/com/hivemq/persistence/local/memory/ClientQueueMemoryLocalPersistence.java b/hivemq-edge/src/main/java/com/hivemq/persistence/local/memory/ClientQueueMemoryLocalPersistence.java index 0aadeb4f95..713dd8fa0e 100644 --- a/hivemq-edge/src/main/java/com/hivemq/persistence/local/memory/ClientQueueMemoryLocalPersistence.java +++ b/hivemq-edge/src/main/java/com/hivemq/persistence/local/memory/ClientQueueMemoryLocalPersistence.java @@ -774,6 +774,29 @@ public void removeInFlightMarker( } } + /** + * {@inheritDoc} + */ + @Override + @ExecuteInSingleWriter + public void removeAllInFlightMarkers(final @NotNull String queueId, final int bucketIndex) { + checkNotNull(queueId, "QueueId must not be null"); + ThreadPreConditions.startsWith(SINGLE_WRITER_THREAD_PREFIX); + + final Map bucket = sharedBuckets[bucketIndex]; + final Messages messages = bucket.get(queueId); + if (messages == null) { + return; + } + + for (final MessageWithID messageWithID : messages.qos1Or2Messages) { + if (messageWithID instanceof PublishWithRetained) { + final PublishWithRetained publish = (PublishWithRetained) messageWithID; + publish.setPacketIdentifier(NO_PACKET_ID); + } + } + } + @Override @ExecuteInSingleWriter public void closeDB(final int bucketIndex) { diff --git a/modules/hivemq-edge-module-modbus/src/test/resources/legacy-modbus-adapter-full-config.xml b/modules/hivemq-edge-module-modbus/src/test/resources/legacy-modbus-adapter-full-config.xml deleted file mode 100644 index a19c594bd0..0000000000 --- a/modules/hivemq-edge-module-modbus/src/test/resources/legacy-modbus-adapter-full-config.xml +++ /dev/null @@ -1,74 +0,0 @@ - - - - - my-modbus-protocol-adapter-full - 10 - 9 - 1234 - my.modbus-server.com - false - - - my/topic - 1 - MQTTMessagePerSubscription - - 11 - 13 - - - - name - value1 - - - name - value2 - - - true - false - - - my/topic/2 - 1 - MQTTMessagePerSubscription - - 11 - 13 - - - - name - value1 - - - name - value2 - - - true - false - - - 1337 - - - diff --git a/modules/hivemq-edge-module-modbus/src/test/resources/legacy-modbus-adapter-minimal-config.xml b/modules/hivemq-edge-module-modbus/src/test/resources/legacy-modbus-adapter-minimal-config.xml deleted file mode 100644 index 4d116cefff..0000000000 --- a/modules/hivemq-edge-module-modbus/src/test/resources/legacy-modbus-adapter-minimal-config.xml +++ /dev/null @@ -1,35 +0,0 @@ - - - - - my-modbus-protocol-adapter-min - 1234 - my.modbus-server.com - - - my/topic - - 11 - 13 - - - - - -