Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.ignite.internal.managers.discovery;

import org.apache.ignite.internal.codegen.DiscoveryDataPacketSerializer;
import org.apache.ignite.internal.codegen.InetAddressMessageSerializer;
import org.apache.ignite.internal.codegen.InetSocketAddressMessageSerializer;
import org.apache.ignite.internal.codegen.NodeSpecificDataSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryAuthFailedMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryCacheMetricsMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer;
Expand All @@ -34,15 +36,18 @@
import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeResponseSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryMetricsUpdateMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryNodeAddFinishedMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryNodeFullMetricsMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryNodeMetricsMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage;
import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage;
import org.apache.ignite.spi.discovery.tcp.messages.NodeSpecificData;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCacheMetricsMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
Expand All @@ -58,6 +63,7 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMetricsMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
Expand All @@ -68,6 +74,8 @@
public class DiscoveryMessageFactory implements MessageFactoryProvider {
/** {@inheritDoc} */
@Override public void registerAll(MessageFactory factory) {
factory.register((short)-107, NodeSpecificData::new, new NodeSpecificDataSerializer());
factory.register((short)-106, DiscoveryDataPacket::new, new DiscoveryDataPacketSerializer());
factory.register((short)-105, TcpDiscoveryNodeFullMetricsMessage::new,
new TcpDiscoveryNodeFullMetricsMessageSerializer());
factory.register((short)-104, TcpDiscoveryClientNodesMetricsMessage::new, new TcpDiscoveryClientNodesMetricsMessageSerializer());
Expand All @@ -92,5 +100,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
factory.register((short)13, TcpDiscoveryClientMetricsUpdateMessage::new, new TcpDiscoveryClientMetricsUpdateMessageSerializer());
factory.register((short)14, TcpDiscoveryMetricsUpdateMessage::new, new TcpDiscoveryMetricsUpdateMessageSerializer());
factory.register((short)15, TcpDiscoveryClientAckResponse::new, new TcpDiscoveryClientAckResponseSerializer());
factory.register((short)16, TcpDiscoveryNodeAddFinishedMessage::new, new TcpDiscoveryNodeAddFinishedMessageSerializer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2303,6 +2303,13 @@ private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage ms
delayDiscoData.clear();
}

try {
msg.finishUnmarshal();
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed deserealization msg", e); // TODO fix err msg
}

locNode.setAttributes(msg.clientNodeAttributes());

clearNodeSensitiveData(locNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3446,6 +3446,15 @@ private void sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) {
if (msg instanceof TraceableMessage)
tracing.messages().beforeSend((TraceableMessage)msg);

if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) {
try {
((TcpDiscoveryNodeAddFinishedMessage)msg).prepareMarshal();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}

sendMessageToClients(msg);

List<TcpDiscoveryNode> failedNodes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,42 +28,54 @@
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridComponent;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.tcp.messages.NodeSpecificData;

import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC;

/**
* Carries discovery data in marshalled form
* and allows convenient way of converting it to and from {@link DiscoveryDataBag} objects.
*/
public class DiscoveryDataPacket implements Serializable {
/** Local file header signature(read as a little-endian number). */
private static int ZIP_HEADER_SIGNATURE = 0x04034b50;
public class DiscoveryDataPacket implements Serializable, Message {
/** Local file header signature (read as a little-endian number). */
private static final int ZIP_HEADER_SIGNATURE = 0x04034b50;

/** */
private static final long serialVersionUID = 0L;

/** */
private final UUID joiningNodeId;
@Order(0)
private UUID joiningNodeId;

/** */
@Order(1)
private Map<Integer, byte[]> joiningNodeData = new HashMap<>();

/** */
private transient Map<Integer, Serializable> unmarshalledJoiningNodeData;

/** */
@Order(2)
private Map<Integer, byte[]> commonData = new HashMap<>();

/** */
private Map<UUID, Map<Integer, byte[]>> nodeSpecificData = new LinkedHashMap<>();
@Order(3)
private Map<UUID, NodeSpecificData> nodeSpecificData = new LinkedHashMap<>();

/** */
private transient boolean joiningNodeClient;

/** Constructor. */
public DiscoveryDataPacket() {
// No-op.
}

/**
* @param joiningNodeId Joining node id.
*/
Expand All @@ -78,6 +90,55 @@ public UUID joiningNodeId() {
return joiningNodeId;
}

/**
* @param joiningNodeId Joining node ID.
*/
public void joiningNodeId(UUID joiningNodeId) {
this.joiningNodeId = joiningNodeId;
}

/**
* @return Joining node data.
*/
public Map<Integer, byte[]> joiningNodeData() {
return joiningNodeData;
}

/**
* @param joiningNodeData Joining node data.
*/
public void joiningNodeData(Map<Integer, byte[]> joiningNodeData) {
this.joiningNodeData = joiningNodeData;
}

/**
* @return Common data.
*/
public Map<Integer, byte[]> commonData() {
return commonData;
}

/**
* @param commonData Common data.
*/
public void commonData(Map<Integer, byte[]> commonData) {
this.commonData = commonData;
}

/**
* @return Node specific data.
*/
public Map<UUID, NodeSpecificData> nodeSpecificData() {
return nodeSpecificData;
}

/**
* @param nodeSpecificData New node specific data.
*/
public void nodeSpecificData(Map<UUID, NodeSpecificData> nodeSpecificData) {
this.nodeSpecificData = nodeSpecificData;
}

/**
* @param bag Bag.
* @param nodeId Node id.
Expand All @@ -98,7 +159,7 @@ public void marshalGridNodeData(DiscoveryDataBag bag, UUID nodeId, Marshaller ma
filterDuplicatedData(marshLocNodeSpecificData);

if (!marshLocNodeSpecificData.isEmpty())
nodeSpecificData.put(nodeId, marshLocNodeSpecificData);
nodeSpecificData.put(nodeId, new NodeSpecificData(marshLocNodeSpecificData));
}
}

Expand Down Expand Up @@ -132,8 +193,11 @@ public DiscoveryDataBag unmarshalGridData(
if (nodeSpecificData != null && !nodeSpecificData.isEmpty()) {
Map<UUID, Map<Integer, Serializable>> unmarshNodeSpecData = U.newLinkedHashMap(nodeSpecificData.size());

for (Map.Entry<UUID, Map<Integer, byte[]>> nodeBinEntry : nodeSpecificData.entrySet()) {
Map<Integer, byte[]> nodeBinData = nodeBinEntry.getValue();
for (Map.Entry<UUID, NodeSpecificData> nodeBinEntry : nodeSpecificData.entrySet()) {
if (nodeBinEntry.getValue() == null)
continue;

Map<Integer, byte[]> nodeBinData = nodeBinEntry.getValue().nodeSpecificData();

if (nodeBinData == null || nodeBinData.isEmpty())
continue;
Expand Down Expand Up @@ -260,12 +324,17 @@ public boolean mergeDataFrom(
}

if (nodeSpecificData.size() != mrgdSpecifDataKeys.size()) {
for (Map.Entry<UUID, Map<Integer, byte[]>> e : nodeSpecificData.entrySet()) {
for (Map.Entry<UUID, NodeSpecificData> e : nodeSpecificData.entrySet()) {
if (!mrgdSpecifDataKeys.contains(e.getKey())) {
Map<Integer, byte[]> data = existingDataPacket.nodeSpecificData.get(e.getKey());
NodeSpecificData dataMsg = existingDataPacket.nodeSpecificData.get(e.getKey());

if (data != null && mapsEqual(e.getValue(), data)) {
e.setValue(data);
if (dataMsg == null)
continue;

Map<Integer, byte[]> data = dataMsg.nodeSpecificData();

if (data != null && mapsEqual(e.getValue().nodeSpecificData(), data)) {
e.setValue(new NodeSpecificData(data));

boolean add = mrgdSpecifDataKeys.add(e.getKey());

Expand Down Expand Up @@ -310,7 +379,7 @@ private boolean mapsEqual(Map<Integer, byte[]> m1, Map<Integer, byte[]> m2) {
* @param clientNode Client node.
* @param log Logger.
* @param panic Throw unmarshalling if {@code true}.
* @throws IgniteCheckedException If {@code panic} is {@true} and unmarshalling failed.
* @throws IgniteCheckedException If {@code panic} is {@code True} and unmarshalling failed.
*/
private Map<Integer, Serializable> unmarshalData(
Map<Integer, byte[]> src,
Expand Down Expand Up @@ -358,11 +427,11 @@ else if (binEntry.getKey() < GridComponent.DiscoveryDataExchangeType.VALUES.leng
}

/**
* @param value Value to check.
* @param val Value to check.
* @return {@code true} if value is zipped.
*/
private boolean isZipped(byte[] value) {
return value != null && value.length > 3 && makeInt(value) == ZIP_HEADER_SIGNATURE;
private boolean isZipped(byte[] val) {
return val != null && val.length > 3 && makeInt(val) == ZIP_HEADER_SIGNATURE;
}

/**
Expand Down Expand Up @@ -391,7 +460,7 @@ private void marshalData(
int compressionLevel,
IgniteLogger log
) {
//may happen if nothing was collected from components,
// may happen if nothing was collected from components,
// corresponding map (for common data or for node specific data) left null
if (src == null)
return;
Expand All @@ -411,13 +480,15 @@ private void marshalData(
* TODO https://issues.apache.org/jira/browse/IGNITE-4435
*/
private void filterDuplicatedData(Map<Integer, byte[]> discoData) {
for (Map<Integer, byte[]> existingData : nodeSpecificData.values()) {
for (NodeSpecificData existingData : nodeSpecificData.values()) {
Iterator<Map.Entry<Integer, byte[]>> it = discoData.entrySet().iterator();

while (it.hasNext()) {
Map.Entry<Integer, byte[]> discoDataEntry = it.next();

byte[] curData = existingData.get(discoDataEntry.getKey());
byte[] curData = (existingData == null || existingData.nodeSpecificData() == null)
? null
: existingData.nodeSpecificData().get(discoDataEntry.getKey());

if (Arrays.equals(curData, discoDataEntry.getValue()))
it.remove();
Expand Down Expand Up @@ -454,4 +525,9 @@ public void joiningNodeClient(boolean joiningNodeClient) {
public void clearUnmarshalledJoiningNodeData() {
unmarshalledJoiningNodeData = null;
}

/** {@inheritDoc} */
@Override public short directType() {
return -106;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.spi.discovery.tcp.messages;

import java.io.Serializable;
import java.util.Map;
import java.util.Objects;
import org.apache.ignite.internal.Order;
import org.apache.ignite.plugin.extensions.communication.Message;

/** */
public class NodeSpecificData implements Message, Serializable {
/** */
private static final long serialVersionUID = 0L;

/** */
@Order(0)
private Map<Integer, byte[]> nodeSpecificData;

/** */
public NodeSpecificData() {
// No-op.
}

/**
* @param nodeSpecificData Node specific data.
*/
public NodeSpecificData(Map<Integer, byte[]> nodeSpecificData) {
this.nodeSpecificData = nodeSpecificData;
}

/**
* @return Node specific data.
*/
public Map<Integer, byte[]> nodeSpecificData() {
return nodeSpecificData;
}

/**
* @param nodeSpecificData New node specific data.
*/
public void nodeSpecificData(Map<Integer, byte[]> nodeSpecificData) {
this.nodeSpecificData = nodeSpecificData;
}

/** {@inheritDoc} */
@Override public short directType() {
return -107;
}

/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;

if (o == null || getClass() != o.getClass())
return false;

NodeSpecificData that = (NodeSpecificData)o;

return Objects.equals(nodeSpecificData, that.nodeSpecificData);
}

/** {@inheritDoc} */
@Override public int hashCode() {
return Objects.hashCode(nodeSpecificData);
}
}
Loading