Skip to content

Commit 40df5df

Browse files
committed
Configurable retries for declarative transactions
Added ability to retry transactions executed via `Session#readTransaction()` and `Session#writeTransaction()` methods. Given unit of work will be retried configured number of times with configured delay if it experiences `ServiceUnavailableException` or `SessionExpiredException`. This means it'll be retried when a network error happens of cluster member becomes unavailable.
1 parent 69555d0 commit 40df5df

29 files changed

+1610
-71
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@
2828
import org.neo4j.driver.internal.net.SocketConnector;
2929
import org.neo4j.driver.internal.net.pooling.PoolSettings;
3030
import org.neo4j.driver.internal.net.pooling.SocketConnectionPool;
31+
import org.neo4j.driver.internal.retry.RetryDecision;
32+
import org.neo4j.driver.internal.retry.RetryLogic;
33+
import org.neo4j.driver.internal.retry.RetrySettings;
34+
import org.neo4j.driver.internal.retry.RetryWithDelay;
3135
import org.neo4j.driver.internal.security.SecurityPlan;
3236
import org.neo4j.driver.internal.spi.ConnectionPool;
3337
import org.neo4j.driver.internal.spi.ConnectionProvider;
@@ -47,15 +51,18 @@
4751

4852
public class DriverFactory
4953
{
50-
public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings routingSettings, Config config )
54+
public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings routingSettings,
55+
RetrySettings retrySettings, Config config )
5156
{
5257
BoltServerAddress address = BoltServerAddress.from( uri );
5358
SecurityPlan securityPlan = createSecurityPlan( address, config );
5459
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, config );
60+
RetryLogic<RetryDecision> retryLogic = RetryWithDelay.create( retrySettings, createClock() );
5561

5662
try
5763
{
58-
return createDriver( address, uri.getScheme(), connectionPool, config, routingSettings, securityPlan );
64+
return createDriver( address, uri.getScheme(), connectionPool, config, routingSettings, securityPlan,
65+
retryLogic );
5966
}
6067
catch ( Throwable driverError )
6168
{
@@ -73,14 +80,15 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
7380
}
7481

7582
private Driver createDriver( BoltServerAddress address, String scheme, ConnectionPool connectionPool,
76-
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan )
83+
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan,
84+
RetryLogic<RetryDecision> retryLogic )
7785
{
7886
switch ( scheme.toLowerCase() )
7987
{
8088
case "bolt":
81-
return createDirectDriver( address, connectionPool, config, securityPlan );
89+
return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic );
8290
case "bolt+routing":
83-
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan );
91+
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic );
8492
default:
8593
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
8694
}
@@ -92,10 +100,10 @@ private Driver createDriver( BoltServerAddress address, String scheme, Connectio
92100
* <b>This method is protected only for testing</b>
93101
*/
94102
protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config,
95-
SecurityPlan securityPlan )
103+
SecurityPlan securityPlan, RetryLogic<RetryDecision> retryLogic )
96104
{
97105
ConnectionProvider connectionProvider = new DirectConnectionProvider( address, connectionPool );
98-
SessionFactory sessionFactory = createSessionFactory( connectionProvider, config );
106+
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
99107
return createDriver( config, securityPlan, sessionFactory );
100108
}
101109

@@ -105,14 +113,15 @@ protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool c
105113
* <b>This method is protected only for testing</b>
106114
*/
107115
protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
108-
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan )
116+
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan,
117+
RetryLogic<RetryDecision> retryLogic )
109118
{
110119
if ( !securityPlan.isRoutingCompatible() )
111120
{
112121
throw new IllegalArgumentException( "The chosen security plan is not compatible with a routing driver" );
113122
}
114123
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, config, routingSettings );
115-
SessionFactory sessionFactory = createSessionFactory( connectionProvider, config );
124+
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
116125
return createDriver( config, securityPlan, sessionFactory );
117126
}
118127

@@ -180,9 +189,10 @@ protected Connector createConnector( ConnectionSettings connectionSettings, Secu
180189
* <p>
181190
* <b>This method is protected only for testing</b>
182191
*/
183-
protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider, Config config )
192+
protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider,
193+
RetryLogic<RetryDecision> retryLogic, Config config )
184194
{
185-
return new SessionFactoryImpl( connectionProvider, config, config.logging() );
195+
return new SessionFactoryImpl( connectionProvider, retryLogic, config );
186196
}
187197

188198
private static SecurityPlan createSecurityPlan( BoltServerAddress address, Config config )

driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21+
import org.neo4j.driver.internal.retry.RetryDecision;
22+
import org.neo4j.driver.internal.retry.RetryLogic;
2123
import org.neo4j.driver.internal.spi.ConnectionProvider;
2224
import org.neo4j.driver.v1.AccessMode;
2325
import org.neo4j.driver.v1.Logging;
@@ -28,9 +30,10 @@ class LeakLoggingNetworkSession extends NetworkSession
2830
{
2931
private final String stackTrace;
3032

31-
LeakLoggingNetworkSession( ConnectionProvider connectionProvider, AccessMode mode, Logging logging )
33+
LeakLoggingNetworkSession( ConnectionProvider connectionProvider, AccessMode mode,
34+
RetryLogic<RetryDecision> retryLogic, Logging logging )
3235
{
33-
super( connectionProvider, mode, logging );
36+
super( connectionProvider, mode, retryLogic, logging );
3437
this.stackTrace = captureStackTrace();
3538
}
3639

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21+
import java.util.ArrayList;
22+
import java.util.List;
2123
import java.util.Map;
2224
import java.util.concurrent.atomic.AtomicBoolean;
2325

26+
import org.neo4j.driver.internal.retry.RetryDecision;
27+
import org.neo4j.driver.internal.retry.RetryLogic;
2428
import org.neo4j.driver.internal.spi.Connection;
2529
import org.neo4j.driver.internal.spi.ConnectionProvider;
2630
import org.neo4j.driver.internal.spi.PooledConnection;
@@ -45,6 +49,7 @@ public class NetworkSession implements Session, SessionResourcesHandler
4549
{
4650
private final ConnectionProvider connectionProvider;
4751
private final AccessMode mode;
52+
private final RetryLogic<RetryDecision> retryLogic;
4853
protected final Logger logger;
4954

5055
private String lastBookmark;
@@ -53,10 +58,12 @@ public class NetworkSession implements Session, SessionResourcesHandler
5358

5459
private final AtomicBoolean isOpen = new AtomicBoolean( true );
5560

56-
public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, Logging logging )
61+
public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic<RetryDecision> retryLogic,
62+
Logging logging )
5763
{
5864
this.connectionProvider = connectionProvider;
5965
this.mode = mode;
66+
this.retryLogic = retryLogic;
6067
this.logger = logging.getLog( "Session-" + hashCode() );
6168
}
6269

@@ -242,9 +249,29 @@ public synchronized void onConnectionError( boolean recoverable )
242249

243250
private synchronized <T> T transaction( AccessMode mode, Function<Transaction,T> work )
244251
{
245-
try ( Transaction tx = beginTransaction( mode ) )
252+
RetryDecision decision = null;
253+
List<Throwable> errors = null;
254+
255+
while ( true )
246256
{
247-
return work.apply( tx );
257+
try ( Transaction tx = beginTransaction( mode ) )
258+
{
259+
return work.apply( tx );
260+
}
261+
catch ( Throwable newError )
262+
{
263+
decision = retryLogic.apply( newError, decision );
264+
265+
if ( decision.shouldRetry() )
266+
{
267+
errors = recordError( newError, errors );
268+
}
269+
else
270+
{
271+
addSuppressed( newError, errors );
272+
throw newError;
273+
}
274+
}
248275
}
249276
}
250277

@@ -348,4 +375,28 @@ private void closeCurrentConnection( boolean sync )
348375
logger.debug( "Released connection " + connection.hashCode() );
349376
}
350377
}
378+
379+
private static List<Throwable> recordError( Throwable error, List<Throwable> errors )
380+
{
381+
if ( errors == null )
382+
{
383+
errors = new ArrayList<>();
384+
}
385+
errors.add( error );
386+
return errors;
387+
}
388+
389+
private static void addSuppressed( Throwable error, List<Throwable> suppressedErrors )
390+
{
391+
if ( suppressedErrors != null )
392+
{
393+
for ( Throwable suppressedError : suppressedErrors )
394+
{
395+
if ( error != suppressedError )
396+
{
397+
error.addSuppressed( suppressedError );
398+
}
399+
}
400+
}
401+
}
351402
}

driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21+
import org.neo4j.driver.internal.retry.RetryDecision;
22+
import org.neo4j.driver.internal.retry.RetryLogic;
2123
import org.neo4j.driver.internal.spi.ConnectionProvider;
2224
import org.neo4j.driver.v1.AccessMode;
2325
import org.neo4j.driver.v1.Config;
@@ -27,14 +29,16 @@
2729
public class SessionFactoryImpl implements SessionFactory
2830
{
2931
protected final ConnectionProvider connectionProvider;
32+
protected final RetryLogic<RetryDecision> retryLogic;
3033
protected final Logging logging;
3134
protected final boolean leakedSessionsLoggingEnabled;
3235

33-
SessionFactoryImpl( ConnectionProvider connectionProvider, Config config, Logging logging )
36+
SessionFactoryImpl( ConnectionProvider connectionProvider, RetryLogic<RetryDecision> retryLogic, Config config )
3437
{
3538
this.connectionProvider = connectionProvider;
3639
this.leakedSessionsLoggingEnabled = config.logLeakedSessions();
37-
this.logging = logging;
40+
this.retryLogic = retryLogic;
41+
this.logging = config.logging();
3842
}
3943

4044
@Override
@@ -43,11 +47,11 @@ public Session newInstance( AccessMode mode, String bookmark )
4347
NetworkSession session;
4448
if ( leakedSessionsLoggingEnabled )
4549
{
46-
session = new LeakLoggingNetworkSession( connectionProvider, mode, logging );
50+
session = new LeakLoggingNetworkSession( connectionProvider, mode, retryLogic, logging );
4751
}
4852
else
4953
{
50-
session = new NetworkSession( connectionProvider, mode, logging );
54+
session = new NetworkSession( connectionProvider, mode, retryLogic, logging );
5155
}
5256
session.setLastBookmark( bookmark );
5357
return session;
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.retry;
20+
21+
public interface RetryDecision
22+
{
23+
boolean shouldRetry();
24+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.retry;
20+
21+
public interface RetryLogic<T extends RetryDecision>
22+
{
23+
T apply( Throwable error, T previousDecision );
24+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.retry;
20+
21+
public final class RetrySettings
22+
{
23+
public static final int DEFAULT_MAX_ATTEMPTS = 3;
24+
public static final int DEFAULT_DELAY_MS = 2_000;
25+
public static final RetrySettings DEFAULT = new RetrySettings( DEFAULT_MAX_ATTEMPTS, DEFAULT_DELAY_MS );
26+
27+
private final int maxAttempts;
28+
private final long delayMs;
29+
30+
public RetrySettings( int maxAttempts, long delayMs )
31+
{
32+
this.maxAttempts = maxAttempts;
33+
this.delayMs = delayMs;
34+
}
35+
36+
public int maxAttempts()
37+
{
38+
return maxAttempts;
39+
}
40+
41+
public long delayMs()
42+
{
43+
return delayMs;
44+
}
45+
}

0 commit comments

Comments
 (0)