-
Notifications
You must be signed in to change notification settings - Fork 489
/
Copy pathclone.java
329 lines (293 loc) · 12.5 KB
/
clone.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
package guide;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.zeromq.*;
import org.zeromq.ZMQ.Poller;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZThread.IAttachedRunnable;
public class clone
{
private final ZContext ctx; // Our context wrapper
private final Socket pipe; // Pipe through to clone agent
// .split constructor and destructor
// Here are the constructor and destructor for the clone class. Note that
// we create a context specifically for the pipe that connects our
// frontend to the backend agent:
public clone()
{
ctx = new ZContext();
pipe = ZThread.fork(ctx, new CloneAgent());
}
public void destroy()
{
ctx.destroy();
}
// .split subtree method
// Specify subtree for snapshot and updates, which we must do before
// connecting to a server as the subtree specification is sent as the
// first command to the server. Sends a [SUBTREE][subtree] command to
// the agent:
public void subtree(String subtree)
{
ZMsg msg = new ZMsg();
msg.add("SUBTREE");
msg.add(subtree);
msg.send(pipe);
}
// .split connect method
// Connect to a new server endpoint. We can connect to at most two
// servers. Sends [CONNECT][endpoint][service] to the agent:
public void connect(String address, String service)
{
ZMsg msg = new ZMsg();
msg.add("CONNECT");
msg.add(address);
msg.add(service);
msg.send(pipe);
}
// .split set method
// Set a new value in the shared hashmap. Sends a [SET][key][value][ttl]
// command through to the agent which does the actual work:
public void set(String key, String value, int ttl)
{
ZMsg msg = new ZMsg();
msg.add("SET");
msg.add(key);
msg.add(value);
msg.add(String.format("%d", ttl));
msg.send(pipe);
}
// .split get method
// Look up value in distributed hash table. Sends [GET][key] to the agent and
// waits for a value response. If there is no value available, will eventually
// return NULL:
public String get(String key)
{
ZMsg msg = new ZMsg();
msg.add("GET");
msg.add(key);
msg.send(pipe);
ZMsg reply = ZMsg.recvMsg(pipe);
if (reply != null) {
String value = reply.popString();
reply.destroy();
return value;
}
return null;
}
// .split working with servers
// The backend agent manages a set of servers, which we implement using
// our simple class model:
private static class Server
{
private final String address; // Server address
private final int port; // Server port
private final Socket snapshot; // Snapshot socket
private final Socket subscriber; // Incoming updates
private long expiry; // When server expires
private int requests; // How many snapshot requests made?
protected Server(ZContext ctx, String address, int port, String subtree)
{
System.out.printf("I: adding server %s:%d...\n", address, port);
this.address = address;
this.port = port;
snapshot = ctx.createSocket(SocketType.DEALER);
snapshot.connect(String.format("%s:%d", address, port));
subscriber = ctx.createSocket(SocketType.SUB);
subscriber.connect(String.format("%s:%d", address, port + 1));
subscriber.subscribe(subtree.getBytes(ZMQ.CHARSET));
}
protected void destroy()
{
}
}
// .split backend agent class
// Here is the implementation of the backend agent itself:
// Number of servers to which we will talk to
private final static int SERVER_MAX = 2;
// Server considered dead if silent for this long
private final static int SERVER_TTL = 5000; // msecs
// States we can be in
private final static int STATE_INITIAL = 0; // Before asking server for state
private final static int STATE_SYNCING = 1; // Getting state from server
private final static int STATE_ACTIVE = 2; // Getting new updates from server
private static class Agent
{
private final ZContext ctx; // Context wrapper
private final Socket pipe; // Pipe back to application
private final Map<String, String> kvmap; // Actual key/value table
private String subtree; // Subtree specification, if any
private final Server[] server;
private int nbrServers; // 0 to SERVER_MAX
private int state; // Current state
private int curServer; // If active, server 0 or 1
private long sequence; // Last kvmsg processed
private final Socket publisher; // Outgoing updates
protected Agent(ZContext ctx, Socket pipe)
{
this.ctx = ctx;
this.pipe = pipe;
kvmap = new HashMap<>();
subtree = "";
state = STATE_INITIAL;
publisher = ctx.createSocket(SocketType.PUB);
server = new Server[SERVER_MAX];
}
protected void destroy()
{
for (int serverNbr = 0; serverNbr < nbrServers; serverNbr++)
server[serverNbr].destroy();
}
// .split handling a control message
// Here we handle the different control messages from the frontend;
// SUBTREE, CONNECT, SET, and GET:
private boolean controlMessage()
{
ZMsg msg = ZMsg.recvMsg(pipe);
String command = msg.popString();
if (command == null)
return false; // Interrupted
if (command.equals("SUBTREE")) {
subtree = msg.popString();
}
else if (command.equals("CONNECT")) {
String address = msg.popString();
String service = msg.popString();
if (nbrServers < SERVER_MAX) {
server[nbrServers++] = new Server(ctx, address, Integer.parseInt(service), subtree);
// We broadcast updates to all known servers
publisher.connect(String.format("%s:%d", address, Integer.parseInt(service) + 2));
}
else System.out.printf("E: too many servers (max. %d)\n", SERVER_MAX);
}
else
// .split set and get commands
// When we set a property, we push the new key-value pair onto
// all our connected servers:
if (command.equals("SET")) {
String key = msg.popString();
String value = msg.popString();
String ttl = msg.popString();
kvmap.put(key, value);
// Send key-value pair on to server
kvmsg kvmsg = new kvmsg(0);
kvmsg.setKey(key);
kvmsg.setUUID();
kvmsg.fmtBody("%s", value);
kvmsg.setProp("ttl", ttl);
kvmsg.send(publisher);
kvmsg.destroy();
}
else if (command.equals("GET")) {
String key = msg.popString();
String value = kvmap.get(key);
pipe.send(Objects.requireNonNullElse(value, ""));
}
msg.destroy();
return true;
}
}
private static class CloneAgent implements IAttachedRunnable
{
@Override
public void run(Object[] args, ZContext ctx, Socket pipe)
{
Agent self = new Agent(ctx, pipe);
Poller poller = ctx.createPoller(1);
poller.register(pipe, Poller.POLLIN);
while (!Thread.currentThread().isInterrupted()) {
long pollTimer = -1;
int pollSize = 2;
Server server = self.server[self.curServer];
switch (self.state) {
case STATE_INITIAL:
// In this state we ask the server for a snapshot,
// if we have a server to talk to...
if (self.nbrServers > 0) {
System.out.printf("I: waiting for server at %s:%d...\n", server.address, server.port);
if (server.requests < 2) {
server.snapshot.sendMore("ICANHAZ?");
server.snapshot.send(self.subtree);
server.requests++;
}
server.expiry = System.currentTimeMillis() + SERVER_TTL;
self.state = STATE_SYNCING;
poller.close();
poller = ctx.createPoller(2);
poller.register(pipe, Poller.POLLIN);
poller.register(server.snapshot, Poller.POLLIN);
}
else pollSize = 1;
break;
case STATE_SYNCING:
// In this state we read from snapshot and we expect
// the server to respond, else we fail over.
poller.close();
poller = ctx.createPoller(2);
poller.register(pipe, Poller.POLLIN);
poller.register(server.snapshot, Poller.POLLIN);
break;
case STATE_ACTIVE:
// In this state we read from subscriber and we expect
// the server to give hugz, else we fail over.
poller.close();
poller = ctx.createPoller(2);
poller.register(pipe, Poller.POLLIN);
poller.register(server.subscriber, Poller.POLLIN);
break;
}
if (server != null) {
pollTimer = server.expiry - System.currentTimeMillis();
if (pollTimer < 0)
pollTimer = 0;
}
// .split client poll loop
// We're ready to process incoming messages; if nothing at all
// comes from our server within the timeout, that means the
// server is dead:
int rc = poller.poll(pollTimer);
if (rc == -1)
break; // Context has been shut down
if (poller.pollin(0)) {
if (!self.controlMessage())
break; // Interrupted
}
else if (pollSize == 2 && poller.pollin(1)) {
kvmsg msg = kvmsg.recv(poller.getSocket(1));
if (msg == null)
break; // Interrupted
// Anything from server resets its expiry time
server.expiry = System.currentTimeMillis() + SERVER_TTL;
if (self.state == STATE_SYNCING) {
// Store in snapshot until we're finished
server.requests = 0;
if (msg.getKey().equals("KTHXBAI")) {
self.sequence = msg.getSequence();
self.state = STATE_ACTIVE;
System.out.printf("I: received from %s:%d snapshot=%d\n", server.address, server.port,
self.sequence);
msg.destroy();
}
}
else if (self.state == STATE_ACTIVE) {
// Discard out-of-sequence updates, incl. hugz
if (msg.getSequence() > self.sequence) {
self.sequence = msg.getSequence();
System.out.printf("I: received from %s:%d update=%d\n", server.address, server.port,
self.sequence);
}
else msg.destroy();
}
}
else {
// Server has died, failover to next
System.out.printf("I: server at %s:%d didn't give hugz\n", server.address, server.port);
self.curServer = (self.curServer + 1) % self.nbrServers;
self.state = STATE_INITIAL;
}
}
self.destroy();
}
}
}