Skip to content

Commit dbeb7d1

Browse files
authored
IGNITE-27604 Use MessageSerializer for TcpDiscoveryNodeAddFinishedMessage (#12644)
1 parent 978bbeb commit dbeb7d1

File tree

7 files changed

+289
-31
lines changed

7 files changed

+289
-31
lines changed

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717

1818
package org.apache.ignite.internal.managers.discovery;
1919

20+
import org.apache.ignite.internal.codegen.DiscoveryDataPacketSerializer;
2021
import org.apache.ignite.internal.codegen.InetAddressMessageSerializer;
2122
import org.apache.ignite.internal.codegen.InetSocketAddressMessageSerializer;
23+
import org.apache.ignite.internal.codegen.NodeSpecificDataSerializer;
2224
import org.apache.ignite.internal.codegen.TcpDiscoveryAuthFailedMessageSerializer;
2325
import org.apache.ignite.internal.codegen.TcpDiscoveryCacheMetricsMessageSerializer;
2426
import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer;
@@ -34,6 +36,7 @@
3436
import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeResponseSerializer;
3537
import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer;
3638
import org.apache.ignite.internal.codegen.TcpDiscoveryMetricsUpdateMessageSerializer;
39+
import org.apache.ignite.internal.codegen.TcpDiscoveryNodeAddFinishedMessageSerializer;
3740
import org.apache.ignite.internal.codegen.TcpDiscoveryNodeFailedMessageSerializer;
3841
import org.apache.ignite.internal.codegen.TcpDiscoveryNodeFullMetricsMessageSerializer;
3942
import org.apache.ignite.internal.codegen.TcpDiscoveryNodeLeftMessageSerializer;
@@ -44,8 +47,10 @@
4447
import org.apache.ignite.internal.codegen.TcpDiscoveryStatusCheckMessageSerializer;
4548
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
4649
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
50+
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
4751
import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage;
4852
import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage;
53+
import org.apache.ignite.spi.discovery.tcp.messages.NodeSpecificData;
4954
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessage;
5055
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCacheMetricsMessage;
5156
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
@@ -61,6 +66,7 @@
6166
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse;
6267
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage;
6368
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
69+
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
6470
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage;
6571
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage;
6672
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
@@ -74,6 +80,8 @@
7480
public class DiscoveryMessageFactory implements MessageFactoryProvider {
7581
/** {@inheritDoc} */
7682
@Override public void registerAll(MessageFactory factory) {
83+
factory.register((short)-107, NodeSpecificData::new, new NodeSpecificDataSerializer());
84+
factory.register((short)-106, DiscoveryDataPacket::new, new DiscoveryDataPacketSerializer());
7785
factory.register((short)-105, TcpDiscoveryNodeFullMetricsMessage::new,
7886
new TcpDiscoveryNodeFullMetricsMessageSerializer());
7987
factory.register((short)-104, TcpDiscoveryClientNodesMetricsMessage::new, new TcpDiscoveryClientNodesMetricsMessageSerializer());
@@ -101,5 +109,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
101109
factory.register((short)16, TcpDiscoveryNodeLeftMessage::new, new TcpDiscoveryNodeLeftMessageSerializer());
102110
factory.register((short)17, TcpDiscoveryNodeFailedMessage::new, new TcpDiscoveryNodeFailedMessageSerializer());
103111
factory.register((short)18, TcpDiscoveryStatusCheckMessage::new, new TcpDiscoveryStatusCheckMessageSerializer());
112+
factory.register((short)19, TcpDiscoveryNodeAddFinishedMessage::new, new TcpDiscoveryNodeAddFinishedMessageSerializer());
104113
}
105114
}

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2307,6 +2307,8 @@ private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage ms
23072307
delayDiscoData.clear();
23082308
}
23092309

2310+
msg.finishUnmarshal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration()));
2311+
23102312
locNode.setAttributes(msg.clientNodeAttributes());
23112313

23122314
clearNodeSensitiveData(locNode);

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2489,6 +2489,8 @@ else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) {
24892489
if (addFinishMsg.clientDiscoData() != null) {
24902490
addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(addFinishMsg);
24912491

2492+
addFinishMsg.prepareMarshal(spi.marshaller());
2493+
24922494
msg = addFinishMsg;
24932495

24942496
DiscoveryDataPacket discoData = addFinishMsg.clientDiscoData();
@@ -4833,6 +4835,8 @@ private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
48334835
addFinishMsg.clientDiscoData(msg.gridDiscoveryData());
48344836

48354837
addFinishMsg.clientNodeAttributes(node.attributes());
4838+
4839+
addFinishMsg.prepareMarshal(spi.marshaller());
48364840
}
48374841

48384842
addFinishMsg = tracing.messages().branch(addFinishMsg, msg);

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java

Lines changed: 96 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,49 +21,60 @@
2121
import java.util.Collection;
2222
import java.util.HashMap;
2323
import java.util.Iterator;
24-
import java.util.LinkedHashMap;
2524
import java.util.Map;
2625
import java.util.UUID;
2726
import org.apache.ignite.IgniteCheckedException;
2827
import org.apache.ignite.IgniteException;
2928
import org.apache.ignite.IgniteLogger;
3029
import org.apache.ignite.internal.GridComponent;
30+
import org.apache.ignite.internal.Order;
3131
import org.apache.ignite.internal.util.typedef.X;
3232
import org.apache.ignite.internal.util.typedef.internal.U;
3333
import org.apache.ignite.marshaller.Marshaller;
34+
import org.apache.ignite.plugin.extensions.communication.Message;
3435
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
36+
import org.apache.ignite.spi.discovery.tcp.messages.NodeSpecificData;
3537

3638
import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC;
3739

3840
/**
3941
* Carries discovery data in marshalled form
4042
* and allows convenient way of converting it to and from {@link DiscoveryDataBag} objects.
4143
*/
42-
public class DiscoveryDataPacket implements Serializable {
43-
/** Local file header signature(read as a little-endian number). */
44-
private static int ZIP_HEADER_SIGNATURE = 0x04034b50;
44+
public class DiscoveryDataPacket implements Serializable, Message {
45+
/** Local file header signature (read as a little-endian number). */
46+
private static final int ZIP_HEADER_SIGNATURE = 0x04034b50;
4547

4648
/** */
4749
private static final long serialVersionUID = 0L;
4850

4951
/** */
50-
private final UUID joiningNodeId;
52+
@Order(0)
53+
private UUID joiningNodeId;
5154

5255
/** */
56+
@Order(1)
5357
private Map<Integer, byte[]> joiningNodeData = new HashMap<>();
5458

5559
/** */
5660
private transient Map<Integer, Serializable> unmarshalledJoiningNodeData;
5761

5862
/** */
63+
@Order(2)
5964
private Map<Integer, byte[]> commonData = new HashMap<>();
6065

6166
/** */
62-
private Map<UUID, Map<Integer, byte[]>> nodeSpecificData = new LinkedHashMap<>();
67+
@Order(3)
68+
private Map<UUID, NodeSpecificData> nodeSpecificData = new HashMap<>();
6369

6470
/** */
6571
private transient boolean joiningNodeClient;
6672

73+
/** Constructor. */
74+
public DiscoveryDataPacket() {
75+
// No-op.
76+
}
77+
6778
/**
6879
* @param joiningNodeId Joining node id.
6980
*/
@@ -78,6 +89,55 @@ public UUID joiningNodeId() {
7889
return joiningNodeId;
7990
}
8091

92+
/**
93+
* @param joiningNodeId Joining node ID.
94+
*/
95+
public void joiningNodeId(UUID joiningNodeId) {
96+
this.joiningNodeId = joiningNodeId;
97+
}
98+
99+
/**
100+
* @return Joining node data.
101+
*/
102+
public Map<Integer, byte[]> joiningNodeData() {
103+
return joiningNodeData;
104+
}
105+
106+
/**
107+
* @param joiningNodeData Joining node data.
108+
*/
109+
public void joiningNodeData(Map<Integer, byte[]> joiningNodeData) {
110+
this.joiningNodeData = joiningNodeData;
111+
}
112+
113+
/**
114+
* @return Common data.
115+
*/
116+
public Map<Integer, byte[]> commonData() {
117+
return commonData;
118+
}
119+
120+
/**
121+
* @param commonData Common data.
122+
*/
123+
public void commonData(Map<Integer, byte[]> commonData) {
124+
this.commonData = commonData;
125+
}
126+
127+
/**
128+
* @return Node specific data.
129+
*/
130+
public Map<UUID, NodeSpecificData> nodeSpecificData() {
131+
return nodeSpecificData;
132+
}
133+
134+
/**
135+
* @param nodeSpecificData New node specific data.
136+
*/
137+
public void nodeSpecificData(Map<UUID, NodeSpecificData> nodeSpecificData) {
138+
this.nodeSpecificData = nodeSpecificData;
139+
}
140+
81141
/**
82142
* @param bag Bag.
83143
* @param nodeId Node id.
@@ -98,7 +158,7 @@ public void marshalGridNodeData(DiscoveryDataBag bag, UUID nodeId, Marshaller ma
98158
filterDuplicatedData(marshLocNodeSpecificData);
99159

100160
if (!marshLocNodeSpecificData.isEmpty())
101-
nodeSpecificData.put(nodeId, marshLocNodeSpecificData);
161+
nodeSpecificData.put(nodeId, new NodeSpecificData(marshLocNodeSpecificData));
102162
}
103163
}
104164

@@ -132,8 +192,11 @@ public DiscoveryDataBag unmarshalGridData(
132192
if (nodeSpecificData != null && !nodeSpecificData.isEmpty()) {
133193
Map<UUID, Map<Integer, Serializable>> unmarshNodeSpecData = U.newLinkedHashMap(nodeSpecificData.size());
134194

135-
for (Map.Entry<UUID, Map<Integer, byte[]>> nodeBinEntry : nodeSpecificData.entrySet()) {
136-
Map<Integer, byte[]> nodeBinData = nodeBinEntry.getValue();
195+
for (Map.Entry<UUID, NodeSpecificData> nodeBinEntry : nodeSpecificData.entrySet()) {
196+
if (nodeBinEntry.getValue() == null)
197+
continue;
198+
199+
Map<Integer, byte[]> nodeBinData = nodeBinEntry.getValue().nodeSpecificData();
137200

138201
if (nodeBinData == null || nodeBinData.isEmpty())
139202
continue;
@@ -260,12 +323,17 @@ public boolean mergeDataFrom(
260323
}
261324

262325
if (nodeSpecificData.size() != mrgdSpecifDataKeys.size()) {
263-
for (Map.Entry<UUID, Map<Integer, byte[]>> e : nodeSpecificData.entrySet()) {
326+
for (Map.Entry<UUID, NodeSpecificData> e : nodeSpecificData.entrySet()) {
264327
if (!mrgdSpecifDataKeys.contains(e.getKey())) {
265-
Map<Integer, byte[]> data = existingDataPacket.nodeSpecificData.get(e.getKey());
328+
NodeSpecificData dataMsg = existingDataPacket.nodeSpecificData.get(e.getKey());
266329

267-
if (data != null && mapsEqual(e.getValue(), data)) {
268-
e.setValue(data);
330+
if (dataMsg == null)
331+
continue;
332+
333+
Map<Integer, byte[]> data = dataMsg.nodeSpecificData();
334+
335+
if (data != null && mapsEqual(e.getValue().nodeSpecificData(), data)) {
336+
e.setValue(new NodeSpecificData(data));
269337

270338
boolean add = mrgdSpecifDataKeys.add(e.getKey());
271339

@@ -310,7 +378,7 @@ private boolean mapsEqual(Map<Integer, byte[]> m1, Map<Integer, byte[]> m2) {
310378
* @param clientNode Client node.
311379
* @param log Logger.
312380
* @param panic Throw unmarshalling if {@code true}.
313-
* @throws IgniteCheckedException If {@code panic} is {@true} and unmarshalling failed.
381+
* @throws IgniteCheckedException If {@code panic} is {@code True} and unmarshalling failed.
314382
*/
315383
private Map<Integer, Serializable> unmarshalData(
316384
Map<Integer, byte[]> src,
@@ -358,11 +426,11 @@ else if (binEntry.getKey() < GridComponent.DiscoveryDataExchangeType.VALUES.leng
358426
}
359427

360428
/**
361-
* @param value Value to check.
429+
* @param val Value to check.
362430
* @return {@code true} if value is zipped.
363431
*/
364-
private boolean isZipped(byte[] value) {
365-
return value != null && value.length > 3 && makeInt(value) == ZIP_HEADER_SIGNATURE;
432+
private boolean isZipped(byte[] val) {
433+
return val != null && val.length > 3 && makeInt(val) == ZIP_HEADER_SIGNATURE;
366434
}
367435

368436
/**
@@ -391,7 +459,7 @@ private void marshalData(
391459
int compressionLevel,
392460
IgniteLogger log
393461
) {
394-
//may happen if nothing was collected from components,
462+
// may happen if nothing was collected from components,
395463
// corresponding map (for common data or for node specific data) left null
396464
if (src == null)
397465
return;
@@ -407,17 +475,17 @@ private void marshalData(
407475
}
408476
}
409477

410-
/**
411-
* TODO https://issues.apache.org/jira/browse/IGNITE-4435
412-
*/
478+
/** */
413479
private void filterDuplicatedData(Map<Integer, byte[]> discoData) {
414-
for (Map<Integer, byte[]> existingData : nodeSpecificData.values()) {
480+
for (NodeSpecificData existingData : nodeSpecificData.values()) {
415481
Iterator<Map.Entry<Integer, byte[]>> it = discoData.entrySet().iterator();
416482

417483
while (it.hasNext()) {
418484
Map.Entry<Integer, byte[]> discoDataEntry = it.next();
419485

420-
byte[] curData = existingData.get(discoDataEntry.getKey());
486+
byte[] curData = (existingData == null || existingData.nodeSpecificData() == null)
487+
? null
488+
: existingData.nodeSpecificData().get(discoDataEntry.getKey());
421489

422490
if (Arrays.equals(curData, discoDataEntry.getValue()))
423491
it.remove();
@@ -454,4 +522,9 @@ public void joiningNodeClient(boolean joiningNodeClient) {
454522
public void clearUnmarshalledJoiningNodeData() {
455523
unmarshalledJoiningNodeData = null;
456524
}
525+
526+
/** {@inheritDoc} */
527+
@Override public short directType() {
528+
return -106;
529+
}
457530
}

0 commit comments

Comments
 (0)