Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add request problem information support
- Add reason string to incoming PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, UNSUBACK
- Detect protocol error when server sends user properties (only when `MAX_USER_PROPERTIES` > 0) or a reason string in the PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK and UNSUBACK packets
- Fix client not dropping network connection on receival of a DISCONNECT packet leading to a panic on subsequent call to `Client::abort` in debug builds

## 0.5.1 - 2026-04-10

Expand Down
4 changes: 4 additions & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1598,6 +1598,10 @@ impl<
.recv_body::<DisconnectPacket<MAX_USER_PROPERTIES>>(&header)
.await?;

// The server initiated the disconnect. We must close the transport on our side
// as well so that subsequent error handling (e.g. `abort`) sees a non-Ok network state.
self.raw.close_with(None);

return Err(MqttError::Disconnect {
reason: disconnect.reason_code,
reason_string: disconnect.reason_string.map(Property::into_inner),
Expand Down
21 changes: 16 additions & 5 deletions tests/integration/connect_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,9 @@ async fn keep_alive_not_kept_alive_idle_network() {
server_reference: _,
} | MqttError::Network(_)
));

// A non-recoverable poll error should always allow calling abort without panicking.
c.abort().await;
}

#[tokio::test]
Expand Down Expand Up @@ -635,15 +638,21 @@ async fn keep_alive_not_kept_alive_incoming_qos0() {

assert_ok!(
timeout(Duration::from_secs(4), async {
loop {
if let Err(MqttError::Network(_)) = rx.poll().await {
break;
}
}
while !matches!(
rx.poll().await,
Err(MqttError::Disconnect {
reason: ReasonCode::KeepAliveTimeout,
reason_string: _,
user_properties: _,
server_reference: _,
}) | Err(MqttError::Network(_))
) {}
})
.await,
"expected to be disconnected"
);
// A non-recoverable poll error should always allow calling abort without panicking.
rx.abort().await;
};

join!(receiver, publisher);
Expand Down Expand Up @@ -695,6 +704,8 @@ async fn keep_alive_not_kept_alive_will_timing() {
server_reference: _,
} | MqttError::Network(_)
));
// A non-recoverable poll error should always allow calling abort without panicking.
tx.abort().await;

disconnect(&mut rx, DEFAULT_DC_OPTIONS).await;
}
Expand Down