Skip to content

Commit 9776f6e

Browse files
committed
Preserve ordering of servers in routing table
This commit makes routing table use same ordering of read, write and route servers as returned by the `getServers()` procedure call. Database uses lists to represent these servers and randomly shuffles them before returning, so it makes sense using the provided randomized view. Is also makes boltkit tests more predictable and easy to reason about.
1 parent 40df5df commit 9776f6e

File tree

6 files changed

+288
-78
lines changed

6 files changed

+288
-78
lines changed

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterComposition.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster;
2020

21-
import java.util.HashSet;
21+
import java.util.LinkedHashSet;
2222
import java.util.Set;
2323

2424
import org.neo4j.driver.internal.net.BoltServerAddress;
@@ -46,9 +46,9 @@ public BoltServerAddress apply( Value value )
4646

4747
private ClusterComposition( long expirationTimestamp )
4848
{
49-
this.readers = new HashSet<>();
50-
this.writers = new HashSet<>();
51-
this.routers = new HashSet<>();
49+
this.readers = new LinkedHashSet<>();
50+
this.writers = new LinkedHashSet<>();
51+
this.routers = new LinkedHashSet<>();
5252
this.expirationTimestamp = expirationTimestamp;
5353
}
5454

@@ -69,24 +69,25 @@ public boolean hasWriters()
6969
{
7070
return !writers.isEmpty();
7171
}
72+
7273
public boolean hasRoutersAndReaders()
7374
{
74-
return routers.isEmpty() || readers.isEmpty();
75+
return !routers.isEmpty() && !readers.isEmpty();
7576
}
7677

7778
public Set<BoltServerAddress> readers()
7879
{
79-
return new HashSet<>( readers );
80+
return new LinkedHashSet<>( readers );
8081
}
8182

8283
public Set<BoltServerAddress> writers()
8384
{
84-
return new HashSet<>( writers );
85+
return new LinkedHashSet<>( writers );
8586
}
8687

8788
public Set<BoltServerAddress> routers()
8889
{
89-
return new HashSet<>( routers );
90+
return new LinkedHashSet<>( routers );
9091
}
9192

9293
public long expirationTimestamp() {

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.neo4j.driver.internal.cluster;
2121

2222
import java.util.HashSet;
23+
import java.util.LinkedHashSet;
2324
import java.util.Set;
2425

2526
import org.neo4j.driver.internal.net.BoltServerAddress;
@@ -41,7 +42,7 @@ public class ClusterRoutingTable implements RoutingTable
4142
public ClusterRoutingTable( Clock clock, BoltServerAddress... routingAddresses )
4243
{
4344
this( clock );
44-
routers.update( new HashSet<>( asList( routingAddresses ) ), new HashSet<BoltServerAddress>() );
45+
routers.update( new LinkedHashSet<>( asList( routingAddresses ) ), new HashSet<BoltServerAddress>() );
4546
}
4647

4748
private ClusterRoutingTable( Clock clock )
@@ -67,7 +68,7 @@ public boolean isStale()
6768
public synchronized Set<BoltServerAddress> update( ClusterComposition cluster )
6869
{
6970
expirationTimeout = cluster.expirationTimestamp();
70-
HashSet<BoltServerAddress> removed = new HashSet<>();
71+
Set<BoltServerAddress> removed = new HashSet<>();
7172
readers.update( cluster.readers(), removed );
7273
writers.update( cluster.writers(), removed );
7374
routers.update( cluster.routers(), removed );

driver/src/main/java/org/neo4j/driver/internal/cluster/GetServersProcedureClusterCompositionProvider.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public GetServersProcedureClusterCompositionProvider( Clock clock, Logger log )
5656
@Override
5757
public ClusterCompositionResponse getClusterComposition( Connection connection )
5858
{
59-
List<Record> records = null;
59+
List<Record> records;
6060

6161
// failed to invoke procedure
6262
try
@@ -94,7 +94,7 @@ public ClusterCompositionResponse getClusterComposition( Connection connection )
9494
}
9595

9696
// the cluster result is not a legal reply
97-
if ( cluster.hasRoutersAndReaders() )
97+
if ( !cluster.hasRoutersAndReaders() )
9898
{
9999
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
100100
"%sno router or reader found in response.", PROTOCOL_ERROR_MESSAGE ) ) );
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.cluster;
20+
21+
import org.junit.Test;
22+
23+
import java.util.ArrayList;
24+
import java.util.HashMap;
25+
import java.util.LinkedHashSet;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.Set;
29+
30+
import org.neo4j.driver.internal.InternalRecord;
31+
import org.neo4j.driver.internal.net.BoltServerAddress;
32+
import org.neo4j.driver.v1.Record;
33+
import org.neo4j.driver.v1.Value;
34+
35+
import static java.util.Arrays.asList;
36+
import static org.hamcrest.Matchers.contains;
37+
import static org.junit.Assert.assertEquals;
38+
import static org.junit.Assert.assertFalse;
39+
import static org.junit.Assert.assertThat;
40+
import static org.junit.Assert.assertTrue;
41+
import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.A;
42+
import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.B;
43+
import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.C;
44+
import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.D;
45+
import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.E;
46+
import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.F;
47+
import static org.neo4j.driver.v1.Values.value;
48+
49+
public class ClusterCompositionTest
50+
{
51+
@Test
52+
public void hasWritersReturnsFalseWhenNoWriters()
53+
{
54+
ClusterComposition composition = newComposition( 1, addresses( A, B ), addresses(), addresses( C, D ) );
55+
56+
assertFalse( composition.hasWriters() );
57+
}
58+
59+
@Test
60+
public void hasWritersReturnsTrueWhenSomeWriters()
61+
{
62+
ClusterComposition composition = newComposition( 1, addresses( A, B ), addresses( C, D ), addresses( E, F ) );
63+
64+
assertTrue( composition.hasWriters() );
65+
}
66+
67+
@Test
68+
public void hasRoutersAndReadersReturnsFalseWhenNoRouters()
69+
{
70+
ClusterComposition composition = newComposition( 1, addresses( A, B ), addresses( C, D ), addresses() );
71+
72+
assertFalse( composition.hasRoutersAndReaders() );
73+
}
74+
75+
@Test
76+
public void hasRoutersAndReadersReturnsFalseWhenNoReaders()
77+
{
78+
ClusterComposition composition = newComposition( 1, addresses(), addresses( A, B ), addresses( C, D ) );
79+
80+
assertFalse( composition.hasRoutersAndReaders() );
81+
}
82+
83+
@Test
84+
public void hasRoutersAndReadersWhenSomeReadersAndRouters()
85+
{
86+
ClusterComposition composition = newComposition( 1, addresses( A, B ), addresses( C, D ), addresses( E, F ) );
87+
88+
assertTrue( composition.hasRoutersAndReaders() );
89+
}
90+
91+
@Test
92+
public void readersWhenEmpty()
93+
{
94+
ClusterComposition composition = newComposition( 1, addresses(), addresses( A, B ), addresses( C, D ) );
95+
96+
assertEquals( 0, composition.readers().size() );
97+
}
98+
99+
@Test
100+
public void writersWhenEmpty()
101+
{
102+
ClusterComposition composition = newComposition( 1, addresses( A, B ), addresses(), addresses( C, D ) );
103+
104+
assertEquals( 0, composition.writers().size() );
105+
}
106+
107+
@Test
108+
public void routersWhenEmpty()
109+
{
110+
ClusterComposition composition = newComposition( 1, addresses( A, B ), addresses( C, D ), addresses() );
111+
112+
assertEquals( 0, composition.routers().size() );
113+
}
114+
115+
@Test
116+
public void readersWhenNonEmpty()
117+
{
118+
ClusterComposition composition = newComposition( 1, addresses( A, B ), addresses( C, D ), addresses( E, F ) );
119+
120+
assertEquals( addresses( A, B ), composition.readers() );
121+
}
122+
123+
@Test
124+
public void writersWhenNonEmpty()
125+
{
126+
ClusterComposition composition = newComposition( 1, addresses( A, B ), addresses( C, D ), addresses( E, F ) );
127+
128+
assertEquals( addresses( C, D ), composition.writers() );
129+
}
130+
131+
@Test
132+
public void routersWhenNonEmpty()
133+
{
134+
ClusterComposition composition = newComposition( 1, addresses( A, B ), addresses( C, D ), addresses( E, F ) );
135+
136+
assertEquals( addresses( E, F ), composition.routers() );
137+
}
138+
139+
@Test
140+
public void expirationTimestamp()
141+
{
142+
ClusterComposition composition = newComposition( 42, addresses( A, B ), addresses( C, D ), addresses( E, F ) );
143+
144+
assertEquals( 42, composition.expirationTimestamp() );
145+
}
146+
147+
@Test
148+
public void parseCorrectRecord()
149+
{
150+
Value[] values = {
151+
value( 42L ),
152+
value( asList( serversEntry( "READ", A, B ),
153+
serversEntry( "WRITE", C, D ),
154+
serversEntry( "ROUTE", E, F ) ) )
155+
};
156+
Record record = new InternalRecord( asList( "ttl", "servers" ), values );
157+
158+
ClusterComposition composition = ClusterComposition.parse( record, 0 );
159+
160+
// TTL is received in seconds and is converted to millis
161+
assertEquals( 42_000, composition.expirationTimestamp() );
162+
163+
assertEquals( addresses( A, B ), composition.readers() );
164+
assertEquals( addresses( C, D ), composition.writers() );
165+
assertEquals( addresses( E, F ), composition.routers() );
166+
}
167+
168+
@Test
169+
public void parsePreservesOrderOfReaders()
170+
{
171+
Value[] values = {
172+
value( 42L ),
173+
value( asList( serversEntry( "READ", A, C, E, B, F, D ),
174+
serversEntry( "WRITE" ),
175+
serversEntry( "ROUTE" ) ) )
176+
};
177+
Record record = new InternalRecord( asList( "ttl", "servers" ), values );
178+
179+
ClusterComposition composition = ClusterComposition.parse( record, 0 );
180+
181+
assertThat( composition.readers(), contains( A, C, E, B, F, D ) );
182+
assertEquals( 0, composition.writers().size() );
183+
assertEquals( 0, composition.routers().size() );
184+
}
185+
186+
@Test
187+
public void parsePreservesOrderOfWriters()
188+
{
189+
Value[] values = {
190+
value( 42L ),
191+
value( asList( serversEntry( "READ" ),
192+
serversEntry( "WRITE", C, F, D, A, B, E ),
193+
serversEntry( "ROUTE" ) ) )
194+
};
195+
Record record = new InternalRecord( asList( "ttl", "servers" ), values );
196+
197+
ClusterComposition composition = ClusterComposition.parse( record, 0 );
198+
199+
assertEquals( 0, composition.readers().size() );
200+
assertThat( composition.writers(), contains( C, F, D, A, B, E ) );
201+
assertEquals( 0, composition.routers().size() );
202+
}
203+
204+
@Test
205+
public void parsePreservesOrderOfRouters()
206+
{
207+
Value[] values = {
208+
value( 42L ),
209+
value( asList( serversEntry( "READ" ),
210+
serversEntry( "WRITE" ),
211+
serversEntry( "ROUTE", F, D, A, B, C, E ) ) )
212+
};
213+
Record record = new InternalRecord( asList( "ttl", "servers" ), values );
214+
215+
ClusterComposition composition = ClusterComposition.parse( record, 0 );
216+
217+
assertEquals( 0, composition.readers().size() );
218+
assertEquals( 0, composition.writers().size() );
219+
assertThat( composition.routers(), contains( F, D, A, B, C, E ) );
220+
}
221+
222+
private static ClusterComposition newComposition( long expirationTimestamp, Set<BoltServerAddress> readers,
223+
Set<BoltServerAddress> writers, Set<BoltServerAddress> routers )
224+
{
225+
return new ClusterComposition( expirationTimestamp, readers, writers, routers );
226+
}
227+
228+
private static Set<BoltServerAddress> addresses( BoltServerAddress... elements )
229+
{
230+
return new LinkedHashSet<>( asList( elements ) );
231+
}
232+
233+
private static Map<String,Object> serversEntry( String role, BoltServerAddress... addresses )
234+
{
235+
Map<String,Object> map = new HashMap<>();
236+
map.put( "role", role );
237+
List<String> addressStrings = new ArrayList<>();
238+
for ( BoltServerAddress address : addresses )
239+
{
240+
addressStrings.add( address.toString() );
241+
}
242+
map.put( "addresses", addressStrings );
243+
return map;
244+
}
245+
}

driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.neo4j.driver.internal.util.FakeClock;
2424

2525
import static java.util.Arrays.asList;
26+
import static org.junit.Assert.assertEquals;
2627
import static org.junit.Assert.assertFalse;
2728
import static org.junit.Assert.assertTrue;
2829
import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.A;
@@ -31,6 +32,7 @@
3132
import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.D;
3233
import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.E;
3334
import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.EMPTY;
35+
import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.F;
3436
import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.createClusterComposition;
3537

3638
public class ClusterRoutingTableTest
@@ -79,7 +81,6 @@ public void shouldReturnStaleIfNoReader() throws Exception
7981
assertTrue( routingTable.isStale() );
8082
}
8183

82-
8384
@Test
8485
public void shouldReturnStatleIfNoWriter() throws Exception
8586
{
@@ -120,4 +121,17 @@ public void shouldStaleWhenCreate() throws Throwable
120121
// Then
121122
assertTrue( routingTable.isStale() );
122123
}
124+
125+
@Test
126+
public void preservesOrderingOfRouters()
127+
{
128+
ClusterRoutingTable routingTable = new ClusterRoutingTable( new FakeClock(), A, C, D, F, B, E );
129+
130+
assertEquals( A, routingTable.nextRouter() );
131+
assertEquals( C, routingTable.nextRouter() );
132+
assertEquals( D, routingTable.nextRouter() );
133+
assertEquals( F, routingTable.nextRouter() );
134+
assertEquals( B, routingTable.nextRouter() );
135+
assertEquals( E, routingTable.nextRouter() );
136+
}
123137
}

0 commit comments

Comments
 (0)