Skip to content

Commit 5ea1e69

Browse files
author
Zhen
committed
Mark a few [client message, server response messages] pairs as invalid message response pairs,
these message pairs are listed as follows: INIT <- IGNORED ACK_FAILURE <- IGNORED ACK_FAILURE <- FAILURE RESET <- IGNORED RESET <- FAILURE In case of seeing such message pairs, raise an unrecoverable error message on client side.
1 parent 225361d commit 5ea1e69

14 files changed

+229
-57
lines changed

driver/src/main/java/org/neo4j/driver/internal/InternalSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,8 @@ private void ensureNoUnrecoverableError()
198198
{
199199
if( connection.hasUnrecoverableErrors() )
200200
{
201-
throw new ClientException( "Cannot run more statements in the current session as unrecoverable errors " +
202-
"has happened. Please close the currect session and re-run your statement in a" +
201+
throw new ClientException( "Cannot run more statements in the current session as an unrecoverable error " +
202+
"has happened. Please close the current session and re-run your statement in a" +
203203
" new session." );
204204
}
205205
}

driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -66,32 +66,14 @@ public InternalStatementResult( Connection connection, Statement statement )
6666

6767
private StreamCollector newRunResponseCollector()
6868
{
69-
return new StreamCollector()
69+
return new StreamCollector.NoOperationStreamCollector()
7070
{
7171
@Override
7272
public void keys( String[] names )
7373
{
7474
keys = Arrays.asList( names );
7575
}
7676

77-
@Override
78-
public void record( Value[] fields ) {}
79-
80-
@Override
81-
public void statementType( StatementType type ) {}
82-
83-
@Override
84-
public void statementStatistics( SummaryCounters statistics ) {}
85-
86-
@Override
87-
public void plan( Plan plan ) {}
88-
89-
@Override
90-
public void profile( ProfiledPlan plan ) {}
91-
92-
@Override
93-
public void notifications( List<Notification> notifications ) {}
94-
9577
@Override
9678
public void done()
9779
{
@@ -106,11 +88,8 @@ public void done()
10688
private StreamCollector newPullAllResponseCollector( Statement statement )
10789
{
10890
final SummaryBuilder summaryBuilder = new SummaryBuilder( statement );
109-
return new StreamCollector()
91+
return new StreamCollector.NoOperationStreamCollector()
11092
{
111-
@Override
112-
public void keys( String[] names ) {}
113-
11493
@Override
11594
public void record( Value[] fields )
11695
{

driver/src/main/java/org/neo4j/driver/internal/connector/ConcurrencyGuardingConnection.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,12 @@ public void pullAll( StreamCollector collector )
100100
}
101101

102102
@Override
103-
public void reset( StreamCollector collector )
103+
public void reset()
104104
{
105105
try
106106
{
107107
markAsInUse();
108-
delegate.reset( collector );
108+
delegate.reset();
109109
}
110110
finally
111111
{
@@ -114,12 +114,12 @@ public void reset( StreamCollector collector )
114114
}
115115

116116
@Override
117-
public void ackFailure( StreamCollector collector )
117+
public void ackFailure()
118118
{
119119
try
120120
{
121121
markAsInUse();
122-
delegate.ackFailure( collector );
122+
delegate.ackFailure();
123123
}
124124
finally
125125
{

driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketClient.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,7 @@ public void receiveOne( SocketResponseHandler handler ) throws IOException
129129
{
130130
reader.read( handler );
131131

132-
// TODO: all the errors come from the following trace should result in the termination of this channel
133-
// https://github.com/neo4j/neo4j/blob/3
134-
// .0/community/bolt/src/main/java/org/neo4j/bolt/v1/transport/BoltProtocolV1.java#L86
132+
// Stop immediately if bolt protocol error happened on the server
135133
if ( handler.protocolViolationErrorOccurred() )
136134
{
137135
stop();

driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketConnection.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public SocketConnection( String host, int port, Config config )
6767
@Override
6868
public void init( String clientName, Map<String,Value> authToken )
6969
{
70-
queueMessage( new InitMessage( clientName, authToken ), StreamCollector.NO_OP );
70+
queueMessage( new InitMessage( clientName, authToken ), StreamCollector.INIT );
7171
sync();
7272
}
7373

@@ -90,15 +90,15 @@ public void pullAll( StreamCollector collector )
9090
}
9191

9292
@Override
93-
public void reset( StreamCollector collector )
93+
public void reset()
9494
{
95-
queueMessage( RESET, collector );
95+
queueMessage( RESET, StreamCollector.RESET );
9696
}
9797

9898
@Override
99-
public void ackFailure( StreamCollector collector )
99+
public void ackFailure()
100100
{
101-
queueMessage( ACK_FAILURE, collector );
101+
queueMessage( ACK_FAILURE, StreamCollector.ACK_FAILURE );
102102
}
103103

104104
@Override

driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketResponseHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public void handleFailureMessage( String code, String message )
7676
}
7777
if ( collector != null )
7878
{
79-
collector.done();
79+
collector.doneFailure( error );
8080
}
8181
}
8282

@@ -90,7 +90,7 @@ public void handleSuccessMessage( Map<String,Value> meta )
9090
collectPlan( collector, meta.get( "plan" ) );
9191
collectProfile( collector, meta.get( "profile" ) );
9292
collectNotifications( collector, meta.get( "notifications" ) );
93-
collector.done();
93+
collector.doneSuccess();
9494
}
9595

9696
private void collectNotifications( StreamCollector collector, Value notifications )
@@ -178,7 +178,7 @@ public void handleIgnoredMessage()
178178
StreamCollector collector = collectors.remove();
179179
if (collector != null)
180180
{
181-
collector.done();
181+
collector.doneIgnored();
182182
}
183183
}
184184

driver/src/main/java/org/neo4j/driver/internal/messaging/PackStreamMessageFormatV1.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ private void packValue( Value value ) throws IOException
327327
break;
328328

329329
default:
330-
throw new UnsupportedOperationException( "Unknown type: " + value );
330+
throw new IOException( "Unknown type: " + value );
331331
}
332332
}
333333

driver/src/main/java/org/neo4j/driver/internal/pool/PooledConnection.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,11 @@ public void pullAll( StreamCollector collector )
124124
}
125125

126126
@Override
127-
public void reset( StreamCollector collector )
127+
public void reset()
128128
{
129129
try
130130
{
131-
delegate.reset( collector );
131+
delegate.reset();
132132
}
133133
catch ( RuntimeException e )
134134
{
@@ -137,11 +137,11 @@ public void reset( StreamCollector collector )
137137
}
138138

139139
@Override
140-
public void ackFailure( StreamCollector collector )
140+
public void ackFailure()
141141
{
142142
try
143143
{
144-
delegate.ackFailure( collector );
144+
delegate.ackFailure();
145145
}
146146
catch ( RuntimeException e )
147147
{
@@ -230,7 +230,7 @@ private void onDelegateException( RuntimeException e )
230230
}
231231
else
232232
{
233-
ackFailure( StreamCollector.NO_OP );
233+
ackFailure();
234234
}
235235
if( onError != null )
236236
{

driver/src/main/java/org/neo4j/driver/internal/pool/PooledConnectionReleaseConsumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ else if ( driverStopped.get() )
8888
boolean validConnection( PooledConnection pooledConnection )
8989
{
9090
// once the pooledConn has marked to have unrecoverable errors, there is no way to remove the error
91-
// and we should close the conn without bother to reset the conn at all
91+
// and we should close the conn without bothering to reset the conn at all
9292
return !pooledConnection.hasUnrecoverableErrors() &&
9393
reset(pooledConnection) &&
9494
(pooledConnection.idleTime() <= minIdleBeforeConnectionTest || ping( pooledConnection ));
@@ -104,7 +104,7 @@ private boolean reset( PooledConnection conn )
104104
{
105105
try
106106
{
107-
conn.reset( StreamCollector.NO_OP );
107+
conn.reset();
108108
conn.sync();
109109
return true;
110110
}

driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,16 +55,17 @@ public interface Connection extends AutoCloseable
5555
void pullAll( StreamCollector collector );
5656

5757
/**
58-
* Queue a reset action, output will be handed to the collector once the pull starts. This will
58+
* Queue a reset action, throw {@link org.neo4j.driver.v1.exceptions.ClientException} if an ignored message is received. This will
5959
* close the stream once its completed, allowing another {@link #run(String, java.util.Map, StreamCollector) run}
6060
*/
61-
void reset( StreamCollector collector );
61+
void reset();
6262

6363
/**
64-
* Queue a reset action, output will be handed to the collector once the pull starts. This will
65-
* close the stream once its completed, allowing another {@link #run(String, java.util.Map, StreamCollector) run}
64+
* Queue a ack_failure action, valid output could only be success. Throw {@link org.neo4j.driver.v1.exceptions.ClientException} if
65+
* a failure or ignored message is received. This will close the stream once it is completed, allowing another
66+
* {@link #run(String, java.util.Map, StreamCollector) run}
6667
*/
67-
void ackFailure( StreamCollector collector );
68+
void ackFailure();
6869

6970
/**
7071
* Ensure all outstanding actions are carried out on the server.

0 commit comments

Comments
 (0)