From 24f5cc769f6db591d9baa105579ba41725943fe8 Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Fri, 31 Mar 2017 15:16:54 -0700 Subject: [PATCH] RATIS-62. Return the Exception from StateMachine#preAppendTransaction to client as StateMachineException. Contributed by Jing Zhao. --- .../TestRaftStateMachineException.java | 19 ++++++------------- .../ratis/server/impl/RaftServerImpl.java | 17 ++++++++--------- .../apache/ratis/server/impl/ServerState.java | 3 ++- .../apache/ratis/server/storage/RaftLog.java | 9 +++++++-- 4 files changed, 23 insertions(+), 25 deletions(-) diff --git a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java index d8a32f6ab4..5ce10e2d46 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java +++ b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java @@ -72,7 +72,7 @@ public CompletableFuture applyTransaction(TransactionContext trx) { public TransactionContext preAppendTransaction(TransactionContext trx) throws IOException { if (failPreAppend) { - throw new IOException("Fake Exception"); + throw new IOException("Fake Exception in preAppend"); } else { return trx; } @@ -170,12 +170,8 @@ public void testRetryOnExceptionDuringReplication() throws Exception { final long callId = 999; RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId, callId, new RaftTestUtil.SimpleMessage("message")); - try { - rpc.sendRequest(r); - Assert.fail("Exception expected"); - } catch (Exception e) { - e.printStackTrace(); - } + RaftClientReply reply = rpc.sendRequest(r); + Assert.assertTrue(reply.hasStateMachineException()); RetryCache.CacheEntry oldEntry = RaftServerTestUtil.getRetryEntry( cluster.getLeader(), client.getId(), callId); @@ -183,12 +179,9 @@ public void testRetryOnExceptionDuringReplication() throws Exception { Assert.assertTrue(RaftServerTestUtil.isRetryCacheEntryFailed(oldEntry)); // retry - try { - rpc.sendRequest(r); - Assert.fail("Exception expected"); - } catch (Exception e) { - e.printStackTrace(); - } + reply = rpc.sendRequest(r); + Assert.assertTrue(reply.hasStateMachineException()); + RetryCache.CacheEntry currentEntry = RaftServerTestUtil.getRetryEntry( cluster.getLeader(), client.getId(), callId); Assert.assertNotNull(currentEntry); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index ca1b6d3d5a..6a1fbd4edf 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -386,14 +386,14 @@ NotLeaderException generateNotLeaderException() { */ private CompletableFuture appendTransaction( RaftClientRequest request, TransactionContext context, - RetryCache.CacheEntry retryEntry) throws RaftException { + RetryCache.CacheEntry cacheEntry) throws RaftException { LOG.debug("{}: receive client request({})", getId(), request); lifeCycle.assertCurrentState(RUNNING); CompletableFuture reply; final PendingRequest pending; synchronized (this) { - reply = checkLeaderState(request, retryEntry); + reply = checkLeaderState(request, cacheEntry); if (reply != null) { return reply; } @@ -403,13 +403,12 @@ private CompletableFuture appendTransaction( try { entryIndex = state.applyLog(context, request.getClientId(), request.getCallId()); - } catch (IOException e) { - // TODO looks like the IOException is actually only thrown by the SM in - // the preAppend stage. In that case we should wrap the exception in - // StateMachineException and return the exception in a RaftClientReply. - RaftException re = new RaftException(e); - retryEntry.failWithException(re); - throw re; + } catch (StateMachineException e) { + // the StateMachineException is thrown by the SM in the preAppend stage. + // Return the exception in a RaftClientReply. + RaftClientReply exceptionReply = new RaftClientReply(request, e); + cacheEntry.failWithReply(exceptionReply); + return CompletableFuture.completedFuture(exceptionReply); } // put the request into the pending queue diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index 5cd0ee922d..ff75237802 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -20,6 +20,7 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.StateMachineException; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.*; @@ -196,7 +197,7 @@ public RaftLog getLog() { } long applyLog(TransactionContext operation, ClientId clientId, long callId) - throws IOException { + throws StateMachineException { return log.append(currentTerm, operation, clientId, callId); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java index 243da73171..77e554b06f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java @@ -24,6 +24,7 @@ import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.StateMachineException; import org.apache.ratis.server.impl.ConfigurationManager; import org.apache.ratis.server.impl.RaftConfiguration; import org.apache.ratis.server.impl.RaftServerConstants; @@ -126,14 +127,18 @@ public long getNextIndex() { * @return the index of the new log entry. */ public long append(long term, TransactionContext operation, - ClientId clientId, long callId) throws IOException { + ClientId clientId, long callId) throws StateMachineException { checkLogState(); try(AutoCloseableLock writeLock = writeLock()) { final long nextIndex = getNextIndex(); // This is called here to guarantee strict serialization of callback executions in case // the SM wants to attach a logic depending on ordered execution in the log commit order. - operation = operation.preAppendTransaction(); + try { + operation = operation.preAppendTransaction(); + } catch (IOException e) { + throw new StateMachineException(selfId.toString(), e); + } // build the log entry after calling the StateMachine final LogEntryProto e = ProtoUtils.toLogEntryProto(