-
Notifications
You must be signed in to change notification settings - Fork 489
/
Copy pathlbbroker3.java
175 lines (149 loc) · 5.65 KB
/
lbbroker3.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package guide;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Queue;
import org.zeromq.*;
import org.zeromq.ZMQ.PollItem;
import org.zeromq.ZMQ.Socket;
/**
* Load-balancing broker
* Demonstrates use of the ZLoop API and reactor style
* <p>
* The client and worker tasks are identical from the previous example.
*/
public class lbbroker3
{
private static final int NBR_CLIENTS = 10;
private static final int NBR_WORKERS = 3;
private static final byte[] WORKER_READY = { '\001' };
/**
* Basic request-reply client using REQ socket
*/
private static class ClientTask implements ZThread.IDetachedRunnable
{
@Override
public void run(Object [] args)
{
// Prepare our context and sockets
try (ZContext context = new ZContext()) {
Socket client = context.createSocket(SocketType.REQ);
ZHelper.setId(client); // Set a printable identity
client.connect("ipc://frontend.ipc");
// Send request, get reply
client.send("HELLO");
String reply = client.recvStr();
System.out.println("Client: " + reply);
}
}
}
/**
* Worker using REQ socket to do load-balancing
*/
private static class WorkerTask implements ZThread.IDetachedRunnable
{
@Override
public void run(Object [] args)
{
// Prepare our context and sockets
try (ZContext context = new ZContext()) {
Socket worker = context.createSocket(SocketType.REQ);
ZHelper.setId(worker); // Set a printable identity
worker.connect("ipc://backend.ipc");
// Tell backend we're ready for work
ZFrame frame = new ZFrame(WORKER_READY);
frame.send(worker, 0);
while (true) {
ZMsg msg = ZMsg.recvMsg(worker);
if (msg == null)
break;
msg.getLast().reset("OK");
msg.send(worker);
}
}
}
}
//Our load-balancer structure, passed to reactor handlers
private static class LBBroker
{
Socket frontend; // Listen to clients
Socket backend; // Listen to workers
Queue<ZFrame> workers; // List of ready workers
}
/**
* In the reactor design, each time a message arrives on a socket, the
* reactor passes it to a handler function. We have two handlers; one
* for the frontend, one for the backend:
*/
private static class FrontendHandler implements ZLoop.IZLoopHandler
{
@Override
public int handle(ZLoop loop, PollItem item, Object arg_)
{
LBBroker arg = (LBBroker) arg_;
ZMsg msg = ZMsg.recvMsg(arg.frontend);
if (msg != null) {
msg.wrap(arg.workers.poll());
msg.send(arg.backend);
// Cancel reader on frontend if we went from 1 to 0 workers
if (arg.workers.isEmpty()) {
loop.removePoller(new PollItem(arg.frontend, 0));
}
}
return 0;
}
}
private static class BackendHandler implements ZLoop.IZLoopHandler
{
@Override
public int handle(ZLoop loop, PollItem item, Object arg_)
{
LBBroker arg = (LBBroker) arg_;
ZMsg msg = ZMsg.recvMsg(arg.backend);
if (msg != null) {
ZFrame address = msg.unwrap();
// Queue worker address for load-balancing
arg.workers.add(address);
// Enable reader on frontend if we went from 0 to 1 workers
if (arg.workers.size() == 1) {
PollItem newItem = new PollItem(arg.frontend, ZMQ.Poller.POLLIN);
loop.addPoller(newItem, frontendHandler, arg);
}
// Forward message to client if it's not a READY
ZFrame frame = msg.getFirst();
if (Arrays.equals(frame.getData(), WORKER_READY))
msg.destroy();
else msg.send(arg.frontend);
}
return 0;
}
}
private final static FrontendHandler frontendHandler = new FrontendHandler();
private final static BackendHandler backendHandler = new BackendHandler();
/**
* And the main task now sets-up child tasks, then starts its reactor.
* If you press Ctrl-C, the reactor exits and the main task shuts down.
*/
public static void main(String[] args)
{
// Prepare our context and sockets
try (ZContext context = new ZContext()) {
LBBroker arg = new LBBroker();
arg.frontend = context.createSocket(SocketType.ROUTER);
arg.backend = context.createSocket(SocketType.ROUTER);
arg.frontend.bind("ipc://frontend.ipc");
arg.backend.bind("ipc://backend.ipc");
int clientNbr;
for (clientNbr = 0; clientNbr < NBR_CLIENTS; clientNbr++)
ZThread.start(new ClientTask());
for (int workerNbr = 0; workerNbr < NBR_WORKERS; workerNbr++)
ZThread.start(new WorkerTask());
// Queue of available workers
arg.workers = new LinkedList<>();
// Prepare reactor and fire it up
ZLoop reactor = new ZLoop(context);
PollItem item = new PollItem(arg.backend, ZMQ.Poller.POLLIN);
reactor.addPoller(item, backendHandler, arg);
reactor.start();
}
}
}