Skip to content

Commit 575a9ab

Browse files
committed
Terminate request queue on connection error first before terminating response handlers.
Terminating the request queue will ensure that no new requests will be accepted and thus that we could miss a response handler to be terminated. [#245] Signed-off-by: Mark Paluch <[email protected]>
1 parent 98d5cbd commit 575a9ab

File tree

1 file changed

+5
-15
lines changed

1 file changed

+5
-15
lines changed

src/main/java/io/r2dbc/mssql/client/ReactorNettyClient.java

+5-15
Original file line numberDiff line numberDiff line change
@@ -36,25 +36,15 @@
3636
import io.r2dbc.mssql.message.header.PacketIdProvider;
3737
import io.r2dbc.mssql.message.tds.ProtocolException;
3838
import io.r2dbc.mssql.message.tds.Redirect;
39-
import io.r2dbc.mssql.message.token.AbstractDoneToken;
40-
import io.r2dbc.mssql.message.token.AbstractInfoToken;
41-
import io.r2dbc.mssql.message.token.Attention;
42-
import io.r2dbc.mssql.message.token.EnvChangeToken;
43-
import io.r2dbc.mssql.message.token.FeatureExtAckToken;
44-
import io.r2dbc.mssql.message.token.LoginAckToken;
39+
import io.r2dbc.mssql.message.token.*;
4540
import io.r2dbc.mssql.message.type.Collation;
4641
import io.r2dbc.mssql.util.Assert;
4742
import io.r2dbc.spi.R2dbcException;
4843
import io.r2dbc.spi.R2dbcNonTransientResourceException;
4944
import org.reactivestreams.Publisher;
5045
import org.reactivestreams.Subscription;
5146
import reactor.core.CoreSubscriber;
52-
import reactor.core.publisher.EmitterProcessor;
53-
import reactor.core.publisher.Flux;
54-
import reactor.core.publisher.Mono;
55-
import reactor.core.publisher.MonoSink;
56-
import reactor.core.publisher.Sinks;
57-
import reactor.core.publisher.SynchronousSink;
47+
import reactor.core.publisher.*;
5848
import reactor.netty.Connection;
5949
import reactor.netty.NettyOutbound;
6050
import reactor.netty.resources.ConnectionProvider;
@@ -363,7 +353,8 @@ private Object encodeForSend(ClientMessage message) {
363353
@SuppressWarnings("unchecked")
364354
private <T> Mono<T> resumeError(Throwable throwable) {
365355

366-
handleConnectionError(throwable);
356+
logger.error(this.context.getMessage("Error: {}"), throwable.getMessage(), throwable);
357+
367358
this.requestSink.emitComplete((signalType, emitResult) -> {
368359

369360
if (emitResult.isFailure()) {
@@ -373,8 +364,7 @@ private <T> Mono<T> resumeError(Throwable throwable) {
373364
return false;
374365
});
375366

376-
logger.error(this.context.getMessage("Error: {}"), throwable.getMessage(), throwable);
377-
367+
handleConnectionError(throwable);
378368
return (Mono<T>) close();
379369
}
380370

0 commit comments

Comments
 (0)