Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
227c3a1
raw
Vladsz83 Feb 18, 2026
1e1dd00
fix
Vladsz83 Feb 18, 2026
43f4267
fix
Vladsz83 Feb 18, 2026
d68b352
fix
Vladsz83 Feb 19, 2026
5ac43ab
Merge branch 'master' into Message-serializer-for-TcpDiscoveryNodeAdd…
Vladsz83 Feb 19, 2026
1064805
refactoring. + dedicated if
Vladsz83 Feb 19, 2026
4adf01a
impl
Vladsz83 Feb 19, 2026
330d359
Revert "impl"
Vladsz83 Feb 19, 2026
1baa2f2
Revert "refactoring. + dedicated if"
Vladsz83 Feb 19, 2026
9067b9b
fix the serialization
Vladsz83 Feb 19, 2026
d4e6b2b
fixes
Vladsz83 Feb 20, 2026
fb52e0e
fixes
Vladsz83 Feb 20, 2026
333185b
fixes
Vladsz83 Feb 21, 2026
31a3d29
lost serialization fix
Vladsz83 Feb 21, 2026
0a2b97e
Merge branch 'master' into TcpDiscoveryNodeAddedMessage
Vladsz83 Feb 21, 2026
7272166
+ master
Vladsz83 Feb 21, 2026
31c46d9
minor
Vladsz83 Feb 21, 2026
e3e7ae2
impl
Vladsz83 Feb 21, 2026
629d051
impl
Vladsz83 Feb 22, 2026
c8a1ac8
fix
Vladsz83 Feb 22, 2026
ff8a9ee
fix
Vladsz83 Feb 22, 2026
15000d1
+ datapacket serr
Vladsz83 Feb 22, 2026
7b1351d
fix
Vladsz83 Feb 23, 2026
c1d3f81
minority
Vladsz83 Feb 23, 2026
0e72fe3
Merge branch 'master' into IGNITE-27556-TcpDiscoveryNodeAddedMessage_…
Vladsz83 Feb 24, 2026
ccc0c85
merged master
Vladsz83 Feb 24, 2026
e75a62c
reserach
Vladsz83 Feb 24, 2026
a9b9ed4
Revert "reserach"
Vladsz83 Feb 24, 2026
f47b463
Merge branch 'master' into IGNITE-27556-TcpDiscoveryNodeAddedMessage_…
Vladsz83 Feb 24, 2026
d3aa143
impl
Vladsz83 Feb 24, 2026
a1f9eac
+ TcpDiscoveryJoinRequestMessage
Vladsz83 Feb 24, 2026
0b45e48
raw
Vladsz83 Feb 25, 2026
317fc54
cleanup
Vladsz83 Feb 25, 2026
c66b10c
cleanup & fix
Vladsz83 Feb 25, 2026
15d65fc
remove extrnalizable from TcpDiscoveryNode
Vladsz83 Feb 25, 2026
2d708cc
Merge branch 'master' into IGNITE-27556-TcpDiscoveryNodeAddedMessage_…
Vladsz83 Feb 25, 2026
7dcbdcf
- externalizable TcpDiscoveryNode
Vladsz83 Feb 25, 2026
c89a94e
Merge branch 'master' into IGNITE-27899-Make-TcpDiscoveryNode-impleme…
Vladsz83 Feb 28, 2026
4c8073e
+ master
Vladsz83 Feb 28, 2026
890ba23
+ master
Vladsz83 Feb 28, 2026
1131fa7
fixes
Vladsz83 Feb 28, 2026
95f8f83
research
Vladsz83 Feb 28, 2026
cb0e819
minority
Vladsz83 Mar 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.ignite.events;

import java.io.Externalizable;
import java.io.Serializable;
import java.util.Collection;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.tracing.Span;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.tcp.internal.ExternalizableTcpDiscoveryNode;
import org.jetbrains.annotations.Nullable;

/**
Expand Down Expand Up @@ -69,13 +72,13 @@ public class DiscoveryEvent extends EventAdapter {
/** */
private static final long serialVersionUID = 0L;

/** */
/** Cluster node. Has to be {@link Externalizable} or {@link Serializable}. */
private ClusterNode evtNode;

/** Topology version. */
private long topVer;

/** Collection of nodes corresponding to topology version. */
/** Collection of nodes corresponding to topology version. Has to be {@link Externalizable} or {@link Serializable}. */
private Collection<ClusterNode> topSnapshot;

/** Template to generate {@link #message()} lazily. Will be joined with {@link #eventNode()} converted to string. */
Expand Down Expand Up @@ -107,7 +110,7 @@ public DiscoveryEvent() {
public DiscoveryEvent(ClusterNode node, String msg, int type, ClusterNode evtNode) {
super(node, msg, type);

this.evtNode = evtNode;
eventNode(evtNode);
}

/**
Expand All @@ -116,7 +119,7 @@ public DiscoveryEvent(ClusterNode node, String msg, int type, ClusterNode evtNod
* @param evtNode Event node.
*/
public void eventNode(ClusterNode evtNode) {
this.evtNode = evtNode;
this.evtNode = ExternalizableTcpDiscoveryNode.of(evtNode);
}

/**
Expand Down Expand Up @@ -160,7 +163,7 @@ public Collection<ClusterNode> topologyNodes() {
*/
public void topologySnapshot(long topVer, Collection<ClusterNode> topSnapshot) {
this.topVer = topVer;
this.topSnapshot = topSnapshot;
this.topSnapshot = ExternalizableTcpDiscoveryNode.of(topSnapshot);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

package org.apache.ignite.events;

import java.io.Externalizable;
import java.io.Serializable;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.discovery.tcp.internal.ExternalizableTcpDiscoveryNode;
import org.jetbrains.annotations.Nullable;

/**
Expand All @@ -38,7 +41,7 @@ public class EventAdapter implements Event {
/** */
private final long tstamp = U.currentTimeMillis();

/** */
/** Has to be {@link Externalizable} or {@link Serializable}. */
private ClusterNode node;

/** */
Expand Down Expand Up @@ -69,7 +72,7 @@ public EventAdapter(ClusterNode node, String msg, int type) {

A.ensure(type > 0, "Event type ID must be greater than zero.");

this.node = node;
node(node);
this.msg = msg;
this.type = type;
}
Expand Down Expand Up @@ -109,7 +112,7 @@ public EventAdapter(ClusterNode node, String msg, int type) {
* @param node Node.
*/
public void node(ClusterNode node) {
this.node = node;
this.node = ExternalizableTcpDiscoveryNode.of(node);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.ignite.events;

import java.io.Externalizable;
import java.io.Serializable;
import java.util.UUID;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.discovery.tcp.internal.ExternalizableTcpDiscoveryNode;
import org.jetbrains.annotations.Nullable;

/**
Expand Down Expand Up @@ -85,7 +88,7 @@ public class JobEvent extends EventAdapter {
/** */
private IgniteUuid jobId;

/** */
/** Has to be {@link Externalizable} or {@link Serializable}. */
private ClusterNode taskNode;

/** */
Expand Down Expand Up @@ -210,7 +213,7 @@ public ClusterNode taskNode() {
* @param taskNode Node where parent task of the job has originated.
*/
public void taskNode(ClusterNode taskNode) {
this.taskNode = taskNode;
this.taskNode = ExternalizableTcpDiscoveryNode.of(taskNode);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public DirectMessageReader(final MessageFactory msgFactory, IgniteCacheObjectPro
this.msgFactory = msgFactory;
this.cacheObjProc = cacheObjProc;

state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure<StateItem>() {
state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure<>() {
@Override public StateItem apply() {
return new StateItem(msgFactory, cacheObjProc);
}
Expand Down Expand Up @@ -521,7 +521,7 @@ private static class StateItem implements DirectMessageStateItem {
* @param msgFactory Message factory.
* @param cacheObjProc Cache object processor.
*/
public StateItem(MessageFactory msgFactory, IgniteCacheObjectProcessor cacheObjProc) {
public StateItem(MessageFactory msgFactory, @Nullable IgniteCacheObjectProcessor cacheObjProc) {
stream = new DirectByteBufferStream(msgFactory, cacheObjProc);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,25 @@

package org.apache.ignite.internal.managers.discovery;

import java.util.function.Supplier;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.processors.cache.CacheStatisticsModeChangeMessage;
import org.apache.ignite.internal.processors.cache.CacheStatisticsModeChangeMessageSerializer;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacketSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.ClusterNodeCollectionMessage;
import org.apache.ignite.spi.discovery.tcp.messages.ClusterNodeCollectionMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.ClusterNodeMessage;
import org.apache.ignite.spi.discovery.tcp.messages.ClusterNodeMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.IgniteProductVersionMessage;
import org.apache.ignite.spi.discovery.tcp.messages.IgniteProductVersionMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage;
import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage;
Expand All @@ -45,6 +58,10 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequestSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponseSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCollectionMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCollectionMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
Expand All @@ -61,16 +78,21 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMarshallableMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMetricsMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMetricsMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
Expand All @@ -83,11 +105,40 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryServerOnlyCustomEventMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessageSerializer;
import org.jetbrains.annotations.Nullable;

/** Message factory for discovery messages. */
/**
* Message factory for discovery messages. Allows to create an enhanced {@link MessageFactory} allowing to create
* automated pre- and post- marshalling message serializer for {@link TcpDiscoveryMarshallableMessage}.
*/
public class DiscoveryMessageFactory implements MessageFactoryProvider {
/** Custom data marshaller. */
private final @Nullable Marshaller cstDataMarshall;

/** Class loader for the custom data marshalling. */
private final @Nullable ClassLoader cstDataMarshallClsLdr;

/**
* @param cstDataMarshall Custom data marshaller.
* @param cstDataMarshallClsLdr Class loader for the custom data marshalling.
*/
public DiscoveryMessageFactory(@Nullable Marshaller cstDataMarshall, @Nullable ClassLoader cstDataMarshallClsLdr) {
assert cstDataMarshall == null && cstDataMarshallClsLdr == null || cstDataMarshall != null && cstDataMarshallClsLdr != null;

this.cstDataMarshall = cstDataMarshall;
this.cstDataMarshallClsLdr = cstDataMarshallClsLdr;
}

/** {@inheritDoc} */
@Override public void registerAll(MessageFactory factory) {
factory = enhanceMessageFactory(factory);

// Utility messages.
factory.register((short)-112, TcpDiscoveryNodeMessage::new, new TcpDiscoveryNodeMessageSerializer());
factory.register((short)-111, ClusterNodeCollectionMessage::new, new ClusterNodeCollectionMessageSerializer());
factory.register((short)-110, ClusterNodeMessage::new, new ClusterNodeMessageSerializer());
factory.register((short)-109, IgniteProductVersionMessage::new, new IgniteProductVersionMessageSerializer());
factory.register((short)-108, TcpDiscoveryCollectionMessage::new, new TcpDiscoveryCollectionMessageSerializer());
factory.register((short)-107, NodeSpecificData::new, new NodeSpecificDataSerializer());
factory.register((short)-106, DiscoveryDataPacket::new, new DiscoveryDataPacketSerializer());
factory.register((short)-105, TcpDiscoveryNodeFullMetricsMessage::new,
Expand Down Expand Up @@ -123,8 +174,76 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
factory.register((short)21, TcpDiscoveryCustomEventMessage::new, new TcpDiscoveryCustomEventMessageSerializer());
factory.register((short)22, TcpDiscoveryServerOnlyCustomEventMessage::new,
new TcpDiscoveryServerOnlyCustomEventMessageSerializer());
factory.register((short)23, TcpDiscoveryClientReconnectMessage::new, new TcpDiscoveryClientReconnectMessageSerializer());
factory.register((short)24, TcpDiscoveryNodeAddedMessage::new, new TcpDiscoveryNodeAddedMessageSerializer());

// DiscoveryCustomMessage
factory.register((short)500, CacheStatisticsModeChangeMessage::new, new CacheStatisticsModeChangeMessageSerializer());
}

/**
* @return Enhanced {@link MessageFactory} allowing to create automated pre- and post- marshalling message serializer
* for {@link TcpDiscoveryMarshallableMessage}.
*/
private MessageFactory enhanceMessageFactory(MessageFactory mf) {
if (cstDataMarshall == null || cstDataMarshallClsLdr == null)
return mf;

return new MessageFactory() {
@Override public void register(
short directType,
Supplier<Message> supplier,
MessageSerializer serializer
) throws IgniteException {
if (supplier.get() instanceof TcpDiscoveryMarshallableMessage) {
final MessageSerializer serializer0 = serializer;

serializer = new MessageSerializer() {
private Message curMarshallableMsg;

@Override public boolean writeTo(Message msg, MessageWriter writer) {
if (msg instanceof TcpDiscoveryMarshallableMessage && curMarshallableMsg == null) {
curMarshallableMsg = msg;

((TcpDiscoveryMarshallableMessage)msg).prepareMarshal(cstDataMarshall);
}

boolean res = serializer0.writeTo(msg, writer);

if (res && curMarshallableMsg != null) {
assert msg instanceof TcpDiscoveryMarshallableMessage;

curMarshallableMsg = null;
}

return res;
}

@Override public boolean readFrom(Message msg, MessageReader reader) {
boolean res = serializer0.readFrom(msg, reader);

if (res && msg instanceof TcpDiscoveryMarshallableMessage)
((TcpDiscoveryMarshallableMessage)msg).finishUnmarshal(cstDataMarshall, cstDataMarshallClsLdr);

return res;
}
};
}

mf.register(directType, supplier, serializer);
}

@Override public void register(short directType, Supplier<Message> supplier) throws IgniteException {
mf.register(directType, supplier);
}

@Override public Message create(short type) {
return mf.create(type);
}

@Override public MessageSerializer serializer(short type) {
return mf.serializer(type);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.processors.affinity;

import java.io.Externalizable;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -28,6 +29,7 @@
import java.util.UUID;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.discovery.tcp.internal.ExternalizableTcpDiscoveryNode;

/**
* Cached affinity calculations.
Expand All @@ -44,7 +46,7 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable
/** Topology version. */
private final AffinityTopologyVersion topVer;

/** Collection of calculated affinity nodes. */
/** Collection of calculated affinity nodes. Has to be {@link Externalizable} or {@link Serializable}. */
private List<List<ClusterNode>> assignment;

/** Map of primary node partitions. */
Expand Down Expand Up @@ -89,7 +91,7 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable
assert idealAssignment != null;

this.topVer = topVer;
this.assignment = assignment;
assigment0(assignment);
this.idealAssignment = idealAssignment.equals(assignment) ? assignment : idealAssignment;

primary = new HashMap<>();
Expand All @@ -105,12 +107,19 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable
GridAffinityAssignment(AffinityTopologyVersion topVer, GridAffinityAssignment aff) {
this.topVer = topVer;

assignment = aff.assignment;
assigment0(aff.assignment);
idealAssignment = aff.idealAssignment;
primary = aff.primary;
backup = aff.backup;
}

/** */
private void assigment0(List<List<ClusterNode>> assignment) {
this.assignment = new ArrayList<>(assignment.size());

assignment.forEach(nodes -> this.assignment.add(ExternalizableTcpDiscoveryNode.of(nodes)));
}

/**
* @return Affinity assignment computed by affinity function.
*/
Expand Down
Loading