Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package org.apache.hadoop.hdds.scm.ha;

import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumMap;
Expand Down Expand Up @@ -218,26 +216,7 @@ public boolean triggerSnapshot() throws IOException {
}

private Message process(final SCMRatisRequest request) throws Exception {
try {
final Object handler = handlers.get(request.getType());

if (handler == null) {
throw new IOException(
"No handler found for request type " + request.getType());
}

final Object result = handler.getClass()
.getMethod(request.getOperation(),
request.getParameterTypes())
.invoke(handler, request.getArguments());

return SCMRatisResponse.encode(result);
} catch (NoSuchMethodException | SecurityException ex) {
throw new InvalidProtocolBufferException(ex.getMessage());
} catch (InvocationTargetException e) {
final Throwable target = e.getTargetException();
throw target instanceof Exception ? (Exception) target : e;
}
return SCMStateMachine.process(request, handlers.get(request.getType()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
*/
public final class SCMRatisRequest {

static final ScmCodecFactory FACTORY = ScmCodecFactory.getInstance();
private final RequestType type;
private final String operation;
private final Object[] arguments;
Expand Down Expand Up @@ -96,20 +97,18 @@ public Message encode() throws InvalidProtocolBufferException {
final Method.Builder methodBuilder = Method.newBuilder();
methodBuilder.setName(operation);

final List<MethodArgument> args = new ArrayList<>();

int paramCounter = 0;
for (Object argument : arguments) {
final MethodArgument.Builder argBuilder = MethodArgument.newBuilder();
for (int i = 0; i < parameterTypes.length; i++) {
// Set actual method parameter type, not actual argument type.
// This is done to avoid MethodNotFoundException in case if argument is
// subclass type, where as method is defined with super class type.
argBuilder.setType(parameterTypes[paramCounter++].getName());
argBuilder.setValue(ScmCodecFactory.getCodec(argument.getClass())
.serialize(argument));
args.add(argBuilder.build());
final Class<?> parameterType = parameterTypes[i];
final Class<?> resolved = FACTORY.resolve(parameterType);

methodBuilder.addArgs(MethodArgument.newBuilder()
.setType(parameterType.getName())
.setValue(FACTORY.getCodec(resolved).serialize(arguments[i]))
.build());
}
methodBuilder.addAllArgs(args);
requestProtoBuilder.setMethod(methodBuilder.build());
final SCMRatisRequestProto requestProto = requestProtoBuilder.build();
return Message.valueOf(requestProto.toByteString());
Expand Down Expand Up @@ -149,15 +148,10 @@ public static SCMRatisRequest decode(Message message)
if (!argument.hasValue()) {
throw new InvalidProtocolBufferException("Missing argument value");
}
try {
final Class<?> clazz = ReflectionUtil.getClass(argument.getType());
parameterTypes[paramCounter++] = clazz;
args.add(ScmCodecFactory.getCodec(clazz)
.deserialize(argument.getValue()));
} catch (ClassNotFoundException ex) {
throw new InvalidProtocolBufferException(argument.getType() +
" cannot be decoded!" + ex.getMessage());
}
final Class<?> parameterType = FACTORY.getClass(argument.getType());
final Class<?> clazz = FACTORY.resolve(parameterType);
parameterTypes[paramCounter++] = parameterType;
args.add(FACTORY.getCodec(clazz).deserialize(argument.getValue()));
}
return new SCMRatisRequest(requestProto.getType(),
method.getName(), parameterTypes, args.toArray());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
* Represents the response from RatisServer.
*/
public final class SCMRatisResponse {
static final ScmCodecFactory FACTORY = ScmCodecFactory.getInstance();

private final boolean success;
private final Object result;
Expand Down Expand Up @@ -65,17 +66,18 @@ public Exception getException() {
return exception;
}

public static Message encode(final Object result)
public static Message encode(Object result, Class<?> type)
throws InvalidProtocolBufferException {

if (result == null) {
return Message.EMPTY;
}

final Class<?> type = result.getClass();
final Class<?> resolved = FACTORY.resolve(type);

final SCMRatisResponseProto response = SCMRatisResponseProto.newBuilder()
.setType(type.getName())
.setValue(ScmCodecFactory.getCodec(type).serialize(result))
.setValue(FACTORY.getCodec(resolved).serialize(result))
.build();
return Message.valueOf(UnsafeByteOperations.unsafeWrap(response.toByteString().asReadOnlyByteBuffer()));
}
Expand All @@ -102,14 +104,8 @@ public static SCMRatisResponse decode(RaftClientReply reply)
throw new InvalidProtocolBufferException("Missing response value");
}

try {
final Class<?> type = ReflectionUtil.getClass(responseProto.getType());
return new SCMRatisResponse(ScmCodecFactory.getCodec(type)
.deserialize(responseProto.getValue()));
} catch (ClassNotFoundException e) {
throw new InvalidProtocolBufferException(responseProto.getType() +
" cannot be decoded!" + e.getMessage());
}
final Class<?> type = FACTORY.resolve(responseProto.getType());
return new SCMRatisResponse(FACTORY.getCodec(type).deserialize(responseProto.getValue()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.EnumMap;
import java.util.List;
Expand Down Expand Up @@ -178,18 +179,19 @@ public CompletableFuture<Message> applyTransaction(
}

private Message process(final SCMRatisRequest request) throws Exception {
try {
final Object handler = handlers.get(request.getType());
return process(request, handlers.get(request.getType()));
}

public static Message process(final SCMRatisRequest request, Object handler) throws Exception {
try {
if (handler == null) {
throw new IOException("No handler found for request type " +
request.getType());
}

final Object result = handler.getClass().getMethod(
request.getOperation(), request.getParameterTypes())
.invoke(handler, request.getArguments());
return SCMRatisResponse.encode(result);
final Method method = handler.getClass().getMethod(request.getOperation(), request.getParameterTypes());
final Object result = method.invoke(handler, request.getArguments());
return SCMRatisResponse.encode(result, method.getReturnType());
} catch (NoSuchMethodException | SecurityException ex) {
throw new InvalidProtocolBufferException(ex.getMessage());
} catch (InvocationTargetException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,15 @@
import com.google.protobuf.ProtocolMessageEnum;
import java.math.BigInteger;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.IntFunction;
import org.apache.commons.lang3.ClassUtils;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionSummary;
Expand All @@ -49,9 +48,12 @@
*/
public final class ScmCodecFactory {

private static Map<Class<?>, ScmCodec<?>> codecs = new HashMap<>();
private final Map<Class<?>, ScmCodec<?>> codecs = new HashMap<>();
private final Map<String, Class<?>> classes = new ConcurrentHashMap<>();
private final ClassResolver resolver;
private static final ScmCodecFactory INSTANCE = new ScmCodecFactory();

static {
private ScmCodecFactory() {
putProto(ContainerID.getDefaultInstance());
putProto(PipelineID.getDefaultInstance());
putProto(Pipeline.getDefaultInstance());
Expand All @@ -74,35 +76,55 @@ public final class ScmCodecFactory {
putEnum(NodeType.class, NodeType::forNumber);

// Must be the last one
final ClassResolver resolver = new ClassResolver(codecs.keySet());
resolver = new ClassResolver(codecs.keySet());
codecs.put(List.class, new ScmListCodec(resolver));
}

static <T extends Message> void putProto(T proto) {
private <T extends Message> void putProto(T proto) {
final Class<? extends Message> clazz = proto.getClass();
codecs.put(clazz, new ScmNonShadedGeneratedMessageCodec<>(clazz.getSimpleName(), proto.getParserForType()));
}

static <T extends Enum<T> & ProtocolMessageEnum> void putEnum(
private <T extends Enum<T> & ProtocolMessageEnum> void putEnum(
Class<T> enumClass, IntFunction<T> forNumber) {
codecs.put(enumClass, new ScmEnumCodec<>(enumClass, forNumber));
}

private ScmCodecFactory() { }

public static ScmCodec getCodec(Class<?> type)
throws InvalidProtocolBufferException {
final List<Class<?>> classes = new ArrayList<>();
classes.add(type);
classes.addAll(ClassUtils.getAllSuperclasses(type));
classes.addAll(ClassUtils.getAllInterfaces(type));
Comment on lines -97 to -98
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think removing supertypes causes the following error:

org.apache.hadoop.hdds.scm.exceptions.SCMException: org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException: Codec not found for class java.util.ArrayList
	at org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler.translateException(SCMHAInvocationHandler.java:163)
	at org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler.invokeRatis(SCMHAInvocationHandler.java:113)
	at org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler.invoke(SCMHAInvocationHandler.java:72)
	at jdk.proxy2/jdk.proxy2.$Proxy41.addTransactionsToDB(Unknown Source)
	at org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.addTransactions(SCMDeletedBlockTransactionStatusManager.java:470)
	at org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl.addTransactions(DeletedBlockLogImpl.java:253)
	at org.apache.hadoop.ozone.shell.TestDeletedBlocksTxnShell.testGetDeletedBlockSummarySubcommand(TestDeletedBlocksTxnShell.java:178)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1312)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1843)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1808)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)
Caused by: org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException: Codec not found for class java.util.ArrayList
	at org.apache.hadoop.hdds.scm.ha.io.ScmCodecFactory.getCodec(ScmCodecFactory.java:107)
	at org.apache.hadoop.hdds.scm.ha.SCMRatisRequest.encode(SCMRatisRequest.java:107)
	at org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl.submitRequest(SCMRatisServerImpl.java:238)
	at org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler.invokeRatisServer(SCMHAInvocationHandler.java:121)
	at org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler.invokeRatis(SCMHAInvocationHandler.java:110)

Maybe we can fix it by changing ArrayList parameter to List in DeletedBlockLogStateManager and its implementation:

public interface DeletedBlockLogStateManager {
@Replicate
void addTransactionsToDB(ArrayList<DeletedBlocksTransaction> txs,
DeletedBlocksTransactionSummary summary) throws IOException;
@Replicate
void addTransactionsToDB(ArrayList<DeletedBlocksTransaction> txs) throws IOException;
@Replicate
void removeTransactionsFromDB(ArrayList<Long> txIDs, DeletedBlocksTransactionSummary summary)
throws IOException;
@Replicate
void removeTransactionsFromDB(ArrayList<Long> txIDs)
throws IOException;
@Deprecated
@Replicate
void increaseRetryCountOfTransactionInDB(ArrayList<Long> txIDs)
throws IOException;
@Deprecated
@Replicate
int resetRetryCountOfTransactionInDB(ArrayList<Long> txIDs)
throws IOException;

Declaring as List is good practice in any case.

Copy link
Copy Markdown
Contributor

@adoroszlai adoroszlai Mar 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

@szetszwo szetszwo Mar 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adoroszlai , thanks for debugging the test failures!

We should put ArrayList to the factory for backward compatibility.

+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ScmCodecFactory.java
@@ -75,7 +75,9 @@ public final class ScmCodecFactory {

     // Must be the last one
     final ClassResolver resolver = new ClassResolver(codecs.keySet());
-    codecs.put(List.class, new ScmListCodec(resolver));
+    final ScmListCodec listCodec = new ScmListCodec(resolver);
+    codecs.put(List.class, listCodec);
+    codecs.put(ArrayList.class, listCodec);
   }

for (Class<?> clazz : classes) {
if (codecs.containsKey(clazz)) {
return codecs.get(clazz);
}
public static ScmCodecFactory getInstance() {
return INSTANCE;
}

public Class<?> resolve(String className) throws InvalidProtocolBufferException {
return resolver.get(className);
}

public Class<?> resolve(Class<?> clazz) throws InvalidProtocolBufferException {
return resolver.get(clazz);
}

public ScmCodec getCodec(Class<?> resolved) throws InvalidProtocolBufferException {
final ScmCodec<?> codec = codecs.get(resolved);
if (codec != null) {
return codec;
}
throw new InvalidProtocolBufferException(
"Codec for " + type + " not found!");

throw new InvalidProtocolBufferException("Codec not found for " + resolved);
}

public Class<?> getClass(String className) throws InvalidProtocolBufferException {
final Class<?> found = classes.get(className);
if (found != null) {
return found;
}

final Class<?> clazz;
try {
clazz = Class.forName(className);
} catch (ClassNotFoundException e) {
throw new InvalidProtocolBufferException("Class not found for " + className, e);
}
classes.put(className, clazz);
return clazz;
}

/** Resolve the codec class from a given class. */
Expand All @@ -120,31 +142,37 @@ static class ClassResolver {
}

Class<?> get(String className) throws InvalidProtocolBufferException {
final Class<?> c = provided.get(className);
if (c != null) {
return c;
}
throw new InvalidProtocolBufferException("Class not found for " + className);
return get(null, className);
}

Class<?> get(Class<?> clazz) throws InvalidProtocolBufferException {
final String className = clazz.getName();
return get(clazz, clazz.getName());
}

private Class<?> get(Class<?> clazz, String className) throws InvalidProtocolBufferException {
Objects.requireNonNull(className, "className == null");
final Class<?> c = provided.get(className);
if (c != null) {
return c;
}

final Class<?> found = resolved.get(className);
if (found != null) {
return found;
}

if (clazz == null) {
clazz = getInstance().getClass(className);
}

for (Class<?> base : provided.values()) {
if (base.isAssignableFrom(clazz)) {
resolved.put(className, base);
return base;
}
}
throw new InvalidProtocolBufferException("Failed to resolve " + clazz);

throw new InvalidProtocolBufferException("Failed to resolve " + className);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public ByteString serialize(Object object) throws InvalidProtocolBufferException
}

final Class<?> resolved = resolver.get(elements.get(0).getClass());
final ScmCodec<Object> elementCodec = ScmCodecFactory.getCodec(resolved);
final ScmCodec<Object> elementCodec = ScmCodecFactory.getInstance().getCodec(resolved);

final ListArgument.Builder builder = ListArgument.newBuilder()
.setType(resolved.getName());
for (Object e : elements) {
Expand All @@ -67,8 +68,10 @@ public Object deserialize(ByteString value) throws InvalidProtocolBufferExceptio
throw new InvalidProtocolBufferException(
"Missing ListArgument.type: " + argument);
}

final Class<?> elementClass = resolver.get(argument.getType());
final ScmCodec<?> elementCodec = ScmCodecFactory.getCodec(elementClass);

final ScmCodec<?> elementCodec = ScmCodecFactory.getInstance().getCodec(elementClass);
final List<Object> list = new ArrayList<>();
for (ByteString element : argument.getValueList()) {
list.add(elementCodec.deserialize(element));
Expand Down
Loading