forked from zeromq/jeromq
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclonesrv3.java
More file actions
102 lines (86 loc) · 3.47 KB
/
clonesrv3.java
File metadata and controls
102 lines (86 loc) · 3.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package guide;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Poller;
import org.zeromq.ZMQ.Socket;
/**
* Clone server Model Three
* @author Danish Shrestha <dshrestha06@gmail.com>
*
*/
public class clonesrv3
{
private static Map<String, kvsimple> kvMap = new LinkedHashMap<String, kvsimple>();
public void run()
{
try (ZContext ctx = new ZContext()) {
Socket snapshot = ctx.createSocket(SocketType.ROUTER);
snapshot.bind("tcp://*:5556");
Socket publisher = ctx.createSocket(SocketType.PUB);
publisher.bind("tcp://*:5557");
Socket collector = ctx.createSocket(SocketType.PULL);
collector.bind("tcp://*:5558");
Poller poller = ctx.createPoller(2);
poller.register(collector, Poller.POLLIN);
poller.register(snapshot, Poller.POLLIN);
long sequence = 0;
while (!Thread.currentThread().isInterrupted()) {
if (poller.poll(1000) < 0)
break; // Context has been shut down
// apply state updates from main thread
if (poller.pollin(0)) {
kvsimple kvMsg = kvsimple.recv(collector);
if (kvMsg == null) // Interrupted
break;
kvMsg.setSequence(++sequence);
kvMsg.send(publisher);
clonesrv3.kvMap.put(kvMsg.getKey(), kvMsg);
System.out.printf("I: publishing update %5d\n", sequence);
}
// execute state snapshot request
if (poller.pollin(1)) {
byte[] identity = snapshot.recv(0);
if (identity == null)
break; // Interrupted
String request = snapshot.recvStr();
if (!request.equals("ICANHAZ?")) {
System.out.println("E: bad request, aborting");
break;
}
Iterator<Entry<String, kvsimple>> iter = kvMap.entrySet()
.iterator();
while (iter.hasNext()) {
Entry<String, kvsimple> entry = iter.next();
kvsimple msg = entry.getValue();
System.out.println(
"Sending message " + entry.getValue().getSequence()
);
this.sendMessage(msg, identity, snapshot);
}
// now send end message with getSequence number
System.out.println("Sending state snapshot = " + sequence);
snapshot.send(identity, ZMQ.SNDMORE);
kvsimple message = new kvsimple(
"KTHXBAI", sequence, ZMQ.SUBSCRIPTION_ALL
);
message.send(snapshot);
}
}
System.out.printf(" Interrupted\n%d messages handled\n", sequence);
}
}
private void sendMessage(kvsimple msg, byte[] identity, Socket snapshot)
{
snapshot.send(identity, ZMQ.SNDMORE);
msg.send(snapshot);
}
public static void main(String[] args)
{
new clonesrv3().run();
}
}