Skip to content

Commit

Permalink
RATIS-62. Return the Exception from StateMachine#preAppendTransaction…
Browse files Browse the repository at this point in the history
… to client as StateMachineException. Contributed by Jing Zhao.
  • Loading branch information
Jing9 committed Mar 31, 2017
1 parent 42fff2b commit 24f5cc7
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
public TransactionContext preAppendTransaction(TransactionContext trx)
throws IOException {
if (failPreAppend) {
throw new IOException("Fake Exception");
throw new IOException("Fake Exception in preAppend");
} else {
return trx;
}
Expand Down Expand Up @@ -170,25 +170,18 @@ public void testRetryOnExceptionDuringReplication() throws Exception {
final long callId = 999;
RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
callId, new RaftTestUtil.SimpleMessage("message"));
try {
rpc.sendRequest(r);
Assert.fail("Exception expected");
} catch (Exception e) {
e.printStackTrace();
}
RaftClientReply reply = rpc.sendRequest(r);
Assert.assertTrue(reply.hasStateMachineException());

RetryCache.CacheEntry oldEntry = RaftServerTestUtil.getRetryEntry(
cluster.getLeader(), client.getId(), callId);
Assert.assertNotNull(oldEntry);
Assert.assertTrue(RaftServerTestUtil.isRetryCacheEntryFailed(oldEntry));

// retry
try {
rpc.sendRequest(r);
Assert.fail("Exception expected");
} catch (Exception e) {
e.printStackTrace();
}
reply = rpc.sendRequest(r);
Assert.assertTrue(reply.hasStateMachineException());

RetryCache.CacheEntry currentEntry = RaftServerTestUtil.getRetryEntry(
cluster.getLeader(), client.getId(), callId);
Assert.assertNotNull(currentEntry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,14 +386,14 @@ NotLeaderException generateNotLeaderException() {
*/
private CompletableFuture<RaftClientReply> appendTransaction(
RaftClientRequest request, TransactionContext context,
RetryCache.CacheEntry retryEntry) throws RaftException {
RetryCache.CacheEntry cacheEntry) throws RaftException {
LOG.debug("{}: receive client request({})", getId(), request);
lifeCycle.assertCurrentState(RUNNING);
CompletableFuture<RaftClientReply> reply;

final PendingRequest pending;
synchronized (this) {
reply = checkLeaderState(request, retryEntry);
reply = checkLeaderState(request, cacheEntry);
if (reply != null) {
return reply;
}
Expand All @@ -403,13 +403,12 @@ private CompletableFuture<RaftClientReply> appendTransaction(
try {
entryIndex = state.applyLog(context, request.getClientId(),
request.getCallId());
} catch (IOException e) {
// TODO looks like the IOException is actually only thrown by the SM in
// the preAppend stage. In that case we should wrap the exception in
// StateMachineException and return the exception in a RaftClientReply.
RaftException re = new RaftException(e);
retryEntry.failWithException(re);
throw re;
} catch (StateMachineException e) {
// the StateMachineException is thrown by the SM in the preAppend stage.
// Return the exception in a RaftClientReply.
RaftClientReply exceptionReply = new RaftClientReply(request, e);
cacheEntry.failWithReply(exceptionReply);
return CompletableFuture.completedFuture(exceptionReply);
}

// put the request into the pending queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.*;
Expand Down Expand Up @@ -196,7 +197,7 @@ public RaftLog getLog() {
}

long applyLog(TransactionContext operation, ClientId clientId, long callId)
throws IOException {
throws StateMachineException {
return log.append(currentTerm, operation, clientId, callId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.server.impl.ConfigurationManager;
import org.apache.ratis.server.impl.RaftConfiguration;
import org.apache.ratis.server.impl.RaftServerConstants;
Expand Down Expand Up @@ -126,14 +127,18 @@ public long getNextIndex() {
* @return the index of the new log entry.
*/
public long append(long term, TransactionContext operation,
ClientId clientId, long callId) throws IOException {
ClientId clientId, long callId) throws StateMachineException {
checkLogState();
try(AutoCloseableLock writeLock = writeLock()) {
final long nextIndex = getNextIndex();

// This is called here to guarantee strict serialization of callback executions in case
// the SM wants to attach a logic depending on ordered execution in the log commit order.
operation = operation.preAppendTransaction();
try {
operation = operation.preAppendTransaction();
} catch (IOException e) {
throw new StateMachineException(selfId.toString(), e);
}

// build the log entry after calling the StateMachine
final LogEntryProto e = ProtoUtils.toLogEntryProto(
Expand Down

0 comments on commit 24f5cc7

Please sign in to comment.