1818import java .util .Map ;
1919import java .util .concurrent .CompletableFuture ;
2020import java .util .concurrent .ConcurrentHashMap ;
21+ import java .util .concurrent .atomic .AtomicReference ;
2122
2223/**
2324 * The default {@link ConnectionSocket.Factory} and {@link ResponsePump.Factory} for any default connections.
@@ -177,6 +178,7 @@ public String toString() {
177178 }
178179
179180 private static class ThreadResponsePump implements ResponsePump {
181+ private final AtomicReference <Throwable > shutdownReason = new AtomicReference <>();
180182 private final Thread thread ;
181183 private Map <Long , CompletableFuture <Response >> awaiting = new ConcurrentHashMap <>();
182184
@@ -220,7 +222,7 @@ public ThreadResponsePump(ConnectionSocket socket, boolean daemon) {
220222 @ Override
221223 public @ NotNull CompletableFuture <Response > await (long token ) {
222224 if (awaiting == null ) {
223- throw new ReqlDriverError ("Response pump closed." );
225+ throw new ReqlDriverError ("Response pump closed." , shutdownReason . get () );
224226 }
225227 CompletableFuture <Response > future = new CompletableFuture <>();
226228 awaiting .put (token , future );
@@ -234,6 +236,7 @@ public boolean isAlive() {
234236
235237 private void shutdown (Throwable t ) {
236238 Map <Long , CompletableFuture <Response >> awaiting = this .awaiting ;
239+ this .shutdownReason .compareAndSet (null , t );
237240 this .awaiting = null ;
238241 thread .interrupt ();
239242 if (awaiting != null ) {
@@ -243,7 +246,7 @@ private void shutdown(Throwable t) {
243246
244247 @ Override
245248 public void shutdownPump () {
246- shutdown (new ReqlDriverError ( "Response pump closed ." ));
249+ shutdown (new Throwable ( "Shutdown was requested ." ));
247250 }
248251
249252 @ Override
0 commit comments