Skip to content

Commit 4432f5b

Browse files
hanmnkalmar
authored andcommitted
ZOOKEEPER-3948: Introduce a deterministic runtime behavior injection framework for ZooKeeperServer testing.
We'd like to understand how applications built on top of ZooKeeper behave under various faulty conditions, which is important to build resilient end to end solutions and avoid ZooKeeper being single point of failure. We'd also like to achieve this in both unit tests (in process) and integration tests (in and out of process). Traditional methods of using external fault injection mechanisms are non deterministic and requires non trivial set up and hard to integrate with unit tests, so here we introduce the ZooKeeperController service which solves both. The basic idea here is to create a controllable ZooKeeperServer which accepts various control commands (such as - delay request, drop request, eat request, expire session, shutdown, trigger leader election, and so on), and reacting based on incoming commands. The controllable server and production server share the same underlying machineries (quorum peers, ZooKeeper server, etc) but code paths are separate, thus this feature has no production impact. This controller system is currently composed of following pieces: CommandClient: a convenient HTTP client to send control commands to controller service. CommandListener: an embed HTTP server listening incoming commands and dispatch to controller service. Controller Service: the service that's responsible to create controllable ZK server and the controller. ZooKeeperServerController: the controller that changes the behavior of ZK server runtime. Controllable Cnx / Factory: controllable connection that accept behavior change requests. In future more control commands and controllable components can be added on top of this framework. This can be used in either unit tests / integration tests as an in process embedded controllable ZooKeeper server, or as an out of process stand alone controllable ZooKeeper process. Author: Michael Han <[email protected]> Reviewers: Enrico Olivelli <[email protected]>, Norbert Kalmar <[email protected]> Closes apache#1467 from hanm/ZOOKEEPER-3948
1 parent 697ec97 commit 4432f5b

18 files changed

+1927
-3
lines changed

Diff for: pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,11 @@
601601
<artifactId>jetty-servlet</artifactId>
602602
<version>${jetty.version}</version>
603603
</dependency>
604+
<dependency>
605+
<groupId>org.eclipse.jetty</groupId>
606+
<artifactId>jetty-client</artifactId>
607+
<version>${jetty.version}</version>
608+
</dependency>
604609
<dependency>
605610
<groupId>io.dropwizard.metrics</groupId>
606611
<artifactId>metrics-core</artifactId>

Diff for: zookeeper-server/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@
8888
<artifactId>jetty-servlet</artifactId>
8989
<scope>provided</scope>
9090
</dependency>
91+
<dependency>
92+
<groupId>org.eclipse.jetty</groupId>
93+
<artifactId>jetty-client</artifactId>
94+
<scope>provided</scope>
95+
</dependency>
9196
<dependency>
9297
<groupId>com.fasterxml.jackson.core</groupId>
9398
<artifactId>jackson-databind</artifactId>

Diff for: zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public class NIOServerCnxn extends ServerCnxn {
6969

7070
private final ByteBuffer lenBuffer = ByteBuffer.allocate(4);
7171

72-
private ByteBuffer incomingBuffer = lenBuffer;
72+
protected ByteBuffer incomingBuffer = lenBuffer;
7373

7474
private final Queue<ByteBuffer> outgoingBuffers = new LinkedBlockingQueue<ByteBuffer>();
7575

@@ -381,7 +381,7 @@ void doIO(SelectionKey k) throws InterruptedException {
381381
}
382382
}
383383

384-
private void readRequest() throws IOException {
384+
protected void readRequest() throws IOException {
385385
zkServer.processPacket(this, incomingBuffer);
386386
}
387387

Diff for: zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ private boolean doAccept() {
315315
* If there is no worker thread pool, the SelectorThread performs the I/O
316316
* directly.
317317
*/
318-
class SelectorThread extends AbstractSelectThread {
318+
public class SelectorThread extends AbstractSelectThread {
319319

320320
private final int id;
321321
private final Queue<SocketChannel> acceptedQueue;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.zookeeper.server.controller;
20+
21+
import java.io.IOException;
22+
import java.net.InetSocketAddress;
23+
import java.nio.charset.StandardCharsets;
24+
import java.util.concurrent.TimeUnit;
25+
import org.eclipse.jetty.client.HttpClient;
26+
import org.eclipse.jetty.client.api.ContentResponse;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
/**
31+
* A convenient helper to send controller command to ControllerService.
32+
*/
33+
public class CommandClient {
34+
private final int requestTimeoutInMs;
35+
private static final int DEFAULT_TIMEOUT = 10000;
36+
private static final Logger LOG = LoggerFactory.getLogger(CommandClient.class);
37+
private final int hostPort;
38+
private final String hostName;
39+
private HttpClient client;
40+
private boolean started = false;
41+
42+
/**
43+
* Instantiate a client configured to send requests to localhost.
44+
* @param localHostPort Port that the localhost CommandListener is listening on.
45+
* @param requestTimeoutInMs Timeout in ms for synchronous requests to timeout.
46+
*/
47+
public CommandClient(int localHostPort, int requestTimeoutInMs) {
48+
this.client = new HttpClient();
49+
this.requestTimeoutInMs = requestTimeoutInMs;
50+
this.hostName = "localhost";
51+
this.hostPort = localHostPort;
52+
}
53+
54+
/**
55+
* Instantiate a client configured to send requests to the specified host address.
56+
* @param hostAddress The host address of the listening server.
57+
* @param requestTimeoutInMs Timeout in ms for synchronous requests to timeout.
58+
*/
59+
public CommandClient(InetSocketAddress hostAddress, int requestTimeoutInMs) {
60+
this.client = new HttpClient();
61+
this.requestTimeoutInMs = requestTimeoutInMs;
62+
this.hostName = hostAddress.getHostName();
63+
this.hostPort = hostAddress.getPort();
64+
}
65+
66+
public CommandClient(int localhostPort) {
67+
this(localhostPort, DEFAULT_TIMEOUT);
68+
}
69+
70+
public synchronized void close() {
71+
try {
72+
if (client != null) {
73+
client.stop();
74+
client = null;
75+
}
76+
} catch (Exception ex) {
77+
LOG.warn("Exception during shutdown", ex);
78+
}
79+
}
80+
81+
/**
82+
* Send a command with no parameters to the server and wait for a response.
83+
* Returns true if we received a good (200) response and false otherwise.
84+
*/
85+
public boolean trySendCommand(ControlCommand.Action action) {
86+
return trySendCommand(action, null);
87+
}
88+
89+
/**
90+
* Send a command with an optional command parameter to the server and wait for a response.
91+
* @param action The command Action to send.
92+
* @param commandParameter The command parameter, in the form of command/action/parameter.
93+
* @return true if we received a good (200) response and false otherwise.
94+
*/
95+
public boolean trySendCommand(ControlCommand.Action action, String commandParameter) {
96+
try {
97+
if (!started) {
98+
client.start();
99+
started = true;
100+
}
101+
ContentResponse response = sendCommand(action, commandParameter);
102+
LOG.info("Received {} response from the server", response);
103+
return (response.getStatus() == 200);
104+
} catch (InterruptedException | IOException ex) {
105+
LOG.warn("Failed to get response from server", ex);
106+
} catch (Exception ex) {
107+
LOG.error("Unknown exception when sending command", ex);
108+
}
109+
110+
return false;
111+
}
112+
113+
/**
114+
* Send a command and optional command parameter to the server and block until receiving
115+
* a response.
116+
*
117+
* @param action The command Action to send.
118+
* @param commandParameter The command parameter, in the form of command/action/parameter.
119+
* @return The full response body from the CommandListener server.
120+
*/
121+
public ContentResponse sendCommand(ControlCommand.Action action,
122+
String commandParameter) throws Exception {
123+
String command = String.format("%s%s:%s/%s", "http://",
124+
this.hostName, this.hostPort, ControlCommand.createCommandUri(action, commandParameter));
125+
ContentResponse response = this.client.newRequest(command).timeout(this.requestTimeoutInMs,
126+
TimeUnit.MILLISECONDS).send();
127+
LOG.info("Sent command {}", command);
128+
LOG.info("Response body {}", new String(response.getContent(), StandardCharsets.UTF_8));
129+
return response;
130+
}
131+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.zookeeper.server.controller;
20+
21+
import java.io.IOException;
22+
import javax.servlet.http.HttpServletRequest;
23+
import javax.servlet.http.HttpServletResponse;
24+
import org.apache.zookeeper.server.ExitCode;
25+
import org.apache.zookeeper.util.ServiceUtils;
26+
import org.eclipse.jetty.server.Request;
27+
import org.eclipse.jetty.server.Server;
28+
import org.eclipse.jetty.server.handler.AbstractHandler;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
/**
33+
* An HTTP server listening to incoming controller commands sent from CommandClient (or any of your favorite REST client
34+
* ) and dispatching the command to the ZooKeeperServerController for execution.
35+
*/
36+
public class CommandListener {
37+
private static final Logger LOG = LoggerFactory.getLogger(CommandListener.class);
38+
39+
private ZooKeeperServerController controller;
40+
private Server server;
41+
42+
public CommandListener(ZooKeeperServerController controller, ControllerServerConfig config) {
43+
try {
44+
this.controller = controller;
45+
46+
String host = config.getControllerAddress().getHostName();
47+
int port = config.getControllerAddress().getPort();
48+
49+
server = new Server(port);
50+
LOG.info("CommandListener server host: {} with port: {}", host, port);
51+
server.setHandler(new CommandHandler());
52+
server.start();
53+
} catch (Exception ex) {
54+
LOG.error("Failed to instantiate CommandListener.", ex);
55+
ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
56+
}
57+
}
58+
59+
public void close() {
60+
try {
61+
if (server != null) {
62+
server.stop();
63+
server = null;
64+
}
65+
} catch (Exception ex) {
66+
LOG.warn("Exception during shutdown CommandListener server", ex);
67+
}
68+
}
69+
70+
private class CommandHandler extends AbstractHandler {
71+
@Override
72+
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
73+
throws IOException {
74+
// Extract command string from request path. Remove leading '/'.
75+
String commandStr = request.getPathInfo().substring(1);
76+
int responseCode;
77+
response.setContentType("text/html;charset=utf-8");
78+
79+
try {
80+
ControlCommand command = ControlCommand.parseUri(commandStr);
81+
controller.processCommand(command);
82+
baseRequest.setHandled(true);
83+
responseCode = HttpServletResponse.SC_OK;
84+
} catch (IllegalArgumentException ex) {
85+
LOG.error("Bad argument or command", ex);
86+
responseCode = HttpServletResponse.SC_BAD_REQUEST;
87+
} catch (Exception ex) {
88+
LOG.error("Failed processing the request", ex);
89+
throw ex;
90+
}
91+
response.setStatus(responseCode);
92+
response.getWriter().println(commandStr);
93+
LOG.info("CommandListener processed command {} with response code {}", commandStr, responseCode);
94+
}
95+
}
96+
}

0 commit comments

Comments
 (0)