diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java index c24308fb67e..215df45f2de 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java @@ -1540,14 +1540,16 @@ public ReplyHeader submitRequest( * Wait for request completion with timeout. */ private void waitForPacketFinish(ReplyHeader r, Packet packet) throws InterruptedException { - long waitStartTime = Time.currentElapsedTime(); - while (!packet.finished) { - packet.wait(requestTimeout); - if (!packet.finished && ((Time.currentElapsedTime() - waitStartTime) >= requestTimeout)) { - LOG.error("Timeout error occurred for the packet '{}'.", packet); - r.setErr(Code.REQUESTTIMEOUT.intValue()); - break; - } + long remainingTime = requestTimeout; + while (!packet.finished && remainingTime > 0) { + long waitStartTime = Time.currentElapsedTime(); + packet.wait(remainingTime); + remainingTime -= (Time.currentElapsedTime() - waitStartTime); + } + + if (!packet.finished) { + LOG.error("Timeout error occurred for the packet '{}'.", packet); + r.setErr(Code.REQUESTTIMEOUT.intValue()); } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java index 27bc02df785..93f801cabc2 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java @@ -19,13 +19,21 @@ package org.apache.zookeeper; import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThan; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.apache.jute.Record; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.client.HostProvider; import org.apache.zookeeper.client.ZKClientConfig; +import org.apache.zookeeper.common.Time; +import org.apache.zookeeper.proto.ReplyHeader; +import org.apache.zookeeper.proto.RequestHeader; import org.apache.zookeeper.server.quorum.QuorumPeerTestBase; import org.apache.zookeeper.test.ClientBase; import org.apache.zookeeper.test.ClientBase.CountdownWatcher; @@ -37,6 +45,9 @@ public class ClientRequestTimeoutTest extends QuorumPeerTestBase { private static final int SERVER_COUNT = 3; private boolean dropPacket = false; private int dropPacketType = ZooDefs.OpCode.create; + private boolean capturePacket = false; + private int capturePacketType = ZooDefs.OpCode.create; + private ClientCnxn.Packet capturedPacket = null; @Test @Timeout(value = 120) @@ -94,6 +105,105 @@ public void testClientRequestTimeout() throws Exception { } } + @Test + void testClientRequestTimeoutTime() throws Exception { + long requestTimeout = TimeUnit.SECONDS.toMillis(5); + System.setProperty("zookeeper.request.timeout", Long.toString(requestTimeout)); + + CustomZooKeeper zk = null; + int clientPort = PortAssignment.unique(); + MainThread mainThread = new MainThread(0, clientPort, "", false); + mainThread.start(); + try { + assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPort, CONNECTION_TIMEOUT), + "waiting for server 0 being up"); + + CountdownWatcher watch = new CountdownWatcher(); + zk = new CustomZooKeeper(getCxnString(new int[]{clientPort}), ClientBase.CONNECTION_TIMEOUT, watch); + watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT); + + dropPacket = true; + dropPacketType = ZooDefs.OpCode.create; + + String data = "originalData"; + long startTime = Time.currentElapsedTime(); + try { + zk.create("/testClientRequestTimeout", data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); + fail("KeeperException is expected."); + } catch (KeeperException exception) { + long cost = Time.currentElapsedTime() - startTime; + assertEquals(KeeperException.Code.REQUESTTIMEOUT, exception.code()); + LOG.info("testClientRequestTimeoutTime cost:{}", cost); + assertThat(cost, greaterThanOrEqualTo(requestTimeout)); + assertThat(cost, lessThan(requestTimeout + 500)); + } + } finally { + mainThread.shutdown(); + if (zk != null) { + zk.close(); + } + } + } + + + @Test + void testClientRequestTimeoutTimeSimulatingSpuriousWakeup() throws Exception { + long requestTimeout = TimeUnit.SECONDS.toMillis(5); + System.setProperty("zookeeper.request.timeout", Long.toString(requestTimeout)); + + CustomZooKeeper zk = null; + int clientPort = PortAssignment.unique(); + MainThread mainThread = new MainThread(0, clientPort, "", false); + mainThread.start(); + try { + assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPort, CONNECTION_TIMEOUT), + "waiting for server 0 being up"); + + CountdownWatcher watch = new CountdownWatcher(); + zk = new CustomZooKeeper(getCxnString(new int[]{clientPort}), ClientBase.CONNECTION_TIMEOUT, watch); + watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT); + + dropPacket = true; + dropPacketType = ZooDefs.OpCode.create; + capturePacket = true; + capturePacketType = ZooDefs.OpCode.create; + + // Simulating spurious wakeup + new Thread(() -> { + try { + TimeUnit.MILLISECONDS.sleep(requestTimeout / 2); + if (capturedPacket != null) { + synchronized (capturedPacket) { + capturedPacket.notifyAll(); + } + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }).start(); + + String data = "originalData"; + long startTime = Time.currentElapsedTime(); + try { + zk.create("/testClientRequestTimeout", data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); + fail("KeeperException is expected."); + } catch (KeeperException exception) { + long cost = Time.currentElapsedTime() - startTime; + assertEquals(KeeperException.Code.REQUESTTIMEOUT, exception.code()); + LOG.info("testClientRequestTimeoutTimeSimulatingSpuriousWakeup cost:{}", cost); + assertThat(cost, greaterThanOrEqualTo(requestTimeout)); + assertThat(cost, lessThan(requestTimeout + 500)); + } + } finally { + capturePacket = false; + capturedPacket = null; + mainThread.shutdown(); + if (zk != null) { + zk.close(); + } + } + } + /** * @return connection string in the form of * 127.0.0.1:port1,127.0.0.1:port2,127.0.0.1:port3 @@ -143,6 +253,27 @@ public void finishPacket(Packet p) { super.finishPacket(p); } + @Override + public Packet queuePacket( + RequestHeader h, + ReplyHeader r, + Record request, + Record response, + AsyncCallback cb, + String clientPath, + String serverPath, + Object ctx, + ZooKeeper.WatchRegistration watchRegistration, + WatchDeregistration watchDeregistration) { + Packet packet = super.queuePacket(h, r, request, response, cb, clientPath, serverPath, + ctx, watchRegistration, watchDeregistration); + + if (capturePacket && h != null && h.getType() == capturePacketType) { + capturedPacket = packet; + } + return packet; + } + } class CustomZooKeeper extends ZooKeeper {