Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
707153d
IGNITE-28207 MessageProcessor fail build for unknown messages without…
nizhikov Apr 29, 2026
02942a9
IGNITE-28207 MessageProcessor fail build for unknown messages without…
nizhikov Apr 29, 2026
dd9a7c0
IGNITE-28207 MessageProcessor fail build for unknown messages without…
nizhikov Apr 29, 2026
fe7521f
IGNITE-28207 MessageProcessor fail build for unknown messages without…
nizhikov Apr 29, 2026
86b3d00
IGNITE-28207 MessageProcessor fail build for unknown messages without…
nizhikov Apr 29, 2026
77deba6
Merge branch 'IGNITE-28626' into IGNITE-28627
nizhikov Apr 29, 2026
d191df8
IGNITE-28207 MessageProcessor fail build for unknown messages without…
nizhikov Apr 30, 2026
513b12a
Merge branch 'master' into IGNITE-28627
nizhikov Apr 30, 2026
886e938
IGNITE-28207 MessageProcessor fail build for unknown messages without…
nizhikov Apr 30, 2026
e9f156b
IGNITE-28207 MessageProcessor fail build for unknown messages without…
nizhikov Apr 30, 2026
7f9456e
IGNITE-28207 Handle unknown messages in discovery
nizhikov May 4, 2026
f293c9e
IGNITE-28207 Handle unknown messages in discovery
nizhikov May 4, 2026
b37b1b7
IGNITE-28207 Handle unknown messages in discovery
nizhikov May 4, 2026
87a5ec8
IGNITE-28207 Handle unknown messages in discovery
nizhikov May 4, 2026
48ff796
IGNITE-28207 Handle unknown messages in discovery
nizhikov May 4, 2026
2f76f3b
IGNITE-28207 Handle unknown messages in discovery
nizhikov May 4, 2026
d94fbea
IGNITE-28207 Handle unknown messages in discovery
nizhikov May 4, 2026
c543728
IGNITE-28207 Handle unknown messages in discovery
nizhikov May 4, 2026
f786597
IGNITE-28207 Handle unknown messages in discovery
nizhikov May 4, 2026
a19911a
Merge branch 'master' into IGNITE-28627
nizhikov May 6, 2026
b51eb1d
IGNITE-28207 Handle unknown messages in discovery
nizhikov May 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public IgniteMessageFactoryImpl(MessageFactoryProvider[] factories) {
Supplier<Message> supplier = msgSuppliers[directTypeToIndex(directType)];

if (supplier == null)
throw new IgniteException("Invalid message type: " + directType);
throw new UnknownMessageException(directType);

return supplier.get();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.managers.communication;

import org.apache.ignite.IgniteException;
import org.apache.ignite.plugin.extensions.communication.Message;

/**
* Exception to be thrown when unregistered class serialized or unknown message deserialized.
*/
public class UnknownMessageException extends IgniteException {
/** Serial version uid. */
private static final long serialVersionUID = 0L;

/** */
public static final String NO_REG_MSG = "No registration for class: %s";

/** */
public static final String INVALID_TYPE_MSG = "Invalid message type: %d";

/**
* @param directType Unknown direct type.
*/
public UnknownMessageException(short directType) {
super(String.format(INVALID_TYPE_MSG, directType));
}

/**
* @param clazz Unregistered class.
*/
public UnknownMessageException(Class<? extends Message> clazz) {
super(String.format(NO_REG_MSG, clazz.getSimpleName()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.managers.communication.UnknownMessageException;

/**
* Base class for all communication messages.
Expand Down Expand Up @@ -68,7 +69,7 @@ default short directType() {
Short type = REGISTRATIONS.get(clazz);

if (type == null)
throw new IgniteException("No registration for class " + clazz.getSimpleName());
throw new UnknownMessageException(clazz);

return type;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ private static void sleepEx(long millis, Runnable before, Runnable after) throws

spi.writeMessage(ses, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));

TcpDiscoveryHandshakeResponse res = spi.readMessage(ses, ackTimeout0);
TcpDiscoveryHandshakeResponse res = spi.readHandshakeResponse(ses, ackTimeout0);

// Convert the addresses once.
Collection<InetSocketAddress> redirectAddrs = res.redirectAddresses();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.communication.UnknownMessageException;
import org.apache.ignite.internal.managers.discovery.DiscoveryServerOnlyCustomMessage;
import org.apache.ignite.internal.processors.configuration.distributed.DistributedBooleanProperty;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
Expand Down Expand Up @@ -1488,7 +1489,7 @@
// Handshake.
spi.writeMessage(ses, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));

TcpDiscoveryHandshakeResponse res = spi.readMessage(ses, timeoutHelper.nextTimeoutChunk(ackTimeout0));
TcpDiscoveryHandshakeResponse res = spi.readHandshakeResponse(ses, timeoutHelper.nextTimeoutChunk(ackTimeout0));

if (msg instanceof TcpDiscoveryJoinRequestMessage) {
boolean ignore = false;
Expand Down Expand Up @@ -3462,7 +3463,8 @@
timeoutHelper.nextTimeoutChunk(ackTimeout0));
}

TcpDiscoveryHandshakeResponse res = spi.readMessage(ses, timeoutHelper.nextTimeoutChunk(ackTimeout0));
TcpDiscoveryHandshakeResponse res =
spi.readHandshakeResponse(ses, timeoutHelper.nextTimeoutChunk(ackTimeout0));

if (log.isDebugEnabled())
log.debug("Handshake response: " + res);
Expand Down Expand Up @@ -6605,7 +6607,7 @@
boolean srvSock;

try {
try {

Check warning on line 6610 in modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Extract this nested try block into a separate method.

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZ30xB95GLmCnEzhiP7R&open=AZ30xB95GLmCnEzhiP7R&pullRequest=13094
// Set socket options.
spi.configureSocketOptions(sock);

Expand Down Expand Up @@ -6922,7 +6924,7 @@
long sockTimeout = spi.getEffectiveSocketTimeout(srvSock);

while (!isInterrupted()) {
try {

Check warning on line 6927 in modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Extract this nested try block into a separate method.

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZ30xB95GLmCnEzhiP7S&open=AZ30xB95GLmCnEzhiP7S&pullRequest=13094
SecurityUtils.serializeVersion(1);

// Use inifinite timeout for accepting new messages.
Expand Down Expand Up @@ -7185,8 +7187,10 @@
", err=" + X.cause(e, ClassNotFoundException.class).getMessage() + ']');

// Always report marshalling errors.
boolean err = e.hasCause(ObjectStreamException.class) ||
(nodeAlive(nodeId) && spiStateCopy() == CONNECTED && !X.hasCause(e, IOException.class));
// Can receive unknown message on handshake. It's ok - must continue to try find proper port.
boolean err = e.hasCause(ObjectStreamException.class)
|| e.hasCause(UnknownMessageException.class)
|| (nodeAlive(nodeId) && spiStateCopy() == CONNECTED && !X.hasCause(e, IOException.class));

if (err)
LT.error(log, e, "Failed to read message [sock=" + sock + ", locNodeId=" + locNodeId +
Expand Down Expand Up @@ -7220,6 +7224,13 @@
}
}
}
catch (UnknownMessageException e) {
if (spi.ignite() instanceof IgniteEx) {
FailureProcessor failure = ((IgniteEx)spi.ignite()).context().failure();

failure.process(new FailureContext(SYSTEM_WORKER_TERMINATION, e));
}
}
finally {
if (clientMsgWrk != null) {
if (log.isDebugEnabled())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.direct.DirectMessageReader;
import org.apache.ignite.internal.direct.DirectMessageWriter;
import org.apache.ignite.internal.managers.communication.UnknownMessageException;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
Expand Down Expand Up @@ -124,6 +126,10 @@
out.flush();
}
catch (Exception e) {
// See Message#directType()
if (X.hasCause(e, UnknownMessageException.class))
throw e;

// Keep logic similar to `U.marshal(...)`.
if (e instanceof IgniteCheckedException)
throw (IgniteCheckedException)e;
Expand Down Expand Up @@ -194,6 +200,9 @@
return (T)msg;
}
catch (Exception e) {
if (e instanceof UnknownMessageException)

Check warning on line 203 in modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace the usage of the "instanceof" operator by a catch block.

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZ30xB__GLmCnEzhiP7T&open=AZ30xB__GLmCnEzhiP7T&pullRequest=13094
throw e;

// Keep logic similar to `U.marshal(...)`.
if (e instanceof IgniteCheckedException)
throw (IgniteCheckedException)e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.communication.UnknownMessageException;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
Expand Down Expand Up @@ -108,6 +109,7 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryEnsureDelivery;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

Expand Down Expand Up @@ -2436,6 +2438,22 @@ protected Marshaller marshaller() {
return marsh;
}

/**
* On handshake it's OK to receive unknown message.
* Wrapping in {@link IgniteCheckedException} allows caller to handle case properly.
*/
TcpDiscoveryHandshakeResponse readHandshakeResponse(
TcpDiscoveryIoSession ses,
long timeout
) throws IOException, IgniteCheckedException {
try {
return readMessage(ses, timeout);
}
catch (UnknownMessageException e) {
throw new IgniteCheckedException(e);
}
}

/** {@inheritDoc} */
@Override public TcpDiscoverySpi setName(String name) {
super.setName(name);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.spi.discovery.tcp;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.AbstractFailureHandler;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.plugin.AbstractTestPluginProvider;
import org.apache.ignite.plugin.ExtensionRegistry;
import org.apache.ignite.plugin.PluginContext;
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_DUMP_THREADS_ON_FAILURE;
import static org.apache.ignite.internal.managers.communication.UnknownMessageException.INVALID_TYPE_MSG;

/** */
public class DiscoveryDeserializationExceptionTest extends GridCommonAbstractTest {
/** */
private static final int MSG_DIRECT_TYPE = -32764;

/** */
private static final String ERR_MSG = String.format(INVALID_TYPE_MSG, MSG_DIRECT_TYPE);

/** */
private ListeningTestLogger lsnrLog;

/** */
private volatile int failNodeIdx;

/** */
private volatile CountDownLatch failureHandlerLatch;

/** */
private final LogListener errLsnr = LogListener.matches(ERR_MSG).build();

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

if (getTestIgniteInstanceName(failNodeIdx).equals(igniteInstanceName)) {
cfg.setGridLogger(lsnrLog).setFailureHandler(new AbstractFailureHandler() {
@Override protected boolean handle(Ignite ignite, FailureContext fctx) {
assertEquals(FailureType.SYSTEM_WORKER_TERMINATION, fctx.type());
assertEquals(getTestIgniteInstanceName(failNodeIdx), ignite.configuration().getIgniteInstanceName());

assertNotNull(fctx.error());
assertEquals(ERR_MSG, fctx.error().getMessage());

failureHandlerLatch.countDown();

return true;
}
});
}
else
cfg.setPluginProviders(new NotRegisteredMessageProvider());

return cfg;
}

/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
stopAllGrids();

lsnrLog = new ListeningTestLogger(log);

lsnrLog.registerListener(errLsnr);
}

/** */
@Test
@WithSystemProperty(key = IGNITE_DUMP_THREADS_ON_FAILURE, value = "fasle")
public void testReadExceptionLogged() throws Exception {
failureHandlerLatch = new CountDownLatch(1);
failNodeIdx = 1;

startGrids(2);

// grid0 knows about NotRegisteredMessage.
// Expect grid1 fail to read it.
grid(0).context().discovery().sendCustomEvent(new NotRegisteredMessage(""));

assertTrue("Failure handler must be invoked", failureHandlerLatch.await(1, TimeUnit.MINUTES));
assertTrue("Error must be logged", errLsnr.check(30_000));
assertTrue(grid(failNodeIdx).context().invalid());
}

/** */
@Test
@WithSystemProperty(key = IGNITE_DUMP_THREADS_ON_FAILURE, value = "fasle")
public void testReadExceptionLoggedOnClient() throws Exception {
failureHandlerLatch = new CountDownLatch(1);
failNodeIdx = 3;

startGrids(2);

IgniteEx cli = startClientGrid(failNodeIdx);

Check warning on line 122 in modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryDeserializationExceptionTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this useless assignment to local variable "cli".

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZ30xB2uGLmCnEzhiP7O&open=AZ30xB2uGLmCnEzhiP7O&pullRequest=13094

Check warning on line 122 in modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryDeserializationExceptionTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this unused "cli" local variable.

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZ30xB2uGLmCnEzhiP7P&open=AZ30xB2uGLmCnEzhiP7P&pullRequest=13094

// grid0 knows about NotRegisteredMessage.
// Expect client node fail to read it.
grid(0).context().discovery().sendCustomEvent(new NotRegisteredMessage(""));

assertTrue("Error must be logged", errLsnr.check(30_000));
}

/** */
public static class NotRegisteredMessageProvider extends AbstractTestPluginProvider {
/** {@inheritDoc} */
@Override public String name() {
return getClass().getName();
}

/** {@inheritDoc} */
@Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) {
registry.registerExtension(MessageFactoryProvider.class, (factory) ->

Check warning on line 140 in modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryDeserializationExceptionTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the parentheses around the "factory" parameter

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZ30xB2uGLmCnEzhiP7Q&open=AZ30xB2uGLmCnEzhiP7Q&pullRequest=13094
factory.register(MSG_DIRECT_TYPE, NotRegisteredMessage::new, new NotRegisteredMessageSerializer())
);
}
}
}
Loading
Loading