Skip to content

Conversation

@ivandika3
Copy link
Contributor

@ivandika3 ivandika3 commented Jan 17, 2026

What changes were proposed in this pull request?

This task is to come up with the basic implementation of follower read client proxy as a baseline before further performance improvements. The idea is to simply pick an OM node in random (which can be a leader or follower) and use it to submit read requests. The read requests need to keep sending to that OM node unless the OM is down which triggers failover. Write requests should be sent to the OM leader directly.

Further improvements such as followers affinity or picking OM based on latency, applied index, etc will be implemented in follow up tasks. The main focus of this patch is to ensure that long-lived client (e.g. S3G Ozone clients) will stick to the OM follower once it picks it as the current proxy. In the previous leader proxy provider implementation, the client only read from followers until a new write request triggers OMNotLeaderException and the failover causes proxy to always be pointing to the leader.

The implementation is to introduce HadoopRpcOMFollowerReadProxyProvider which wraps
HadoopRpcOMFailoverProxyProvider. FollowerReadProxyProvider tracks a different currentOmNodeId from HadoopRpcOMFailoverProxyProvider. FollowerReadInvocationHandler will check whether the request is a read request (using OmUtils#isReadOnly) and if so forwards it to its current proxy. If it's a write request, the request if forwarded to HadoopRpcOMFailoverProxyProvider to be sent to the leader.

So the proxy hierarchy (each with its own InvocationHandler) is

  • TracingUtil's proxy (InvocationHandler: TraceAllMethod)
    • RetryProxy (InvocationHandler: RetryInvocationHandler)
      • HadoopRpcOMFollowerReadProxyProvider (InvocationHandler: FollowerReadInvocationHandler)
        • ProtocolProxy (which is created in OMFailoverProxyProviderBase#createOMProxy): ProtobufRpcEngine.Invoker

What is the link to the Apache JIRA

https://issues.apache.org/jira/browse/HDDS-14379

How was this patch tested?

UT and IT.

Clean CI: https://github.com/ivandika3/ozone/actions/runs/21102966224

@ivandika3 ivandika3 marked this pull request as ready for review January 18, 2026 05:09
@ivandika3
Copy link
Contributor Author

cc: @greenwich

@ivandika3 ivandika3 self-assigned this Jan 18, 2026
@szetszwo
Copy link
Contributor

@ivandika3 , thanks for working on this! I am reviewing this. The change is quite big. Need some time.

@chungen0126 chungen0126 self-requested a review January 20, 2026 05:40
Copy link
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ivandika3 , thanks a lot for working on this and adding a lot of tests! Please see the comments inlined.

BTW, filed HDDS-14455 and HDDS-14470 for improving the current code.

Comment on lines +126 to 127
protected synchronized ProxyInfo<T> createOMProxyIfNeeded(OMProxyInfo<T> omProxyInfo) {
if (omProxyInfo.proxy == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The synchronized is for omProxyInfo.proxy. We probably should do it in OMProxyInfo. Let me fix it in HDDS-14470.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok will wait until HDDS-14470 is merged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let merge this first if it is ready. I can update the change for HDDS-14470.

new HadoopRpcOMFailoverProxyProvider<>(configuration, ugi, omServiceId, protocol));
}

@SuppressWarnings("unchecked")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move it to wrappedProxy. Then it would only suppress the warning there but not the entire method.

+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFollowerReadFailoverProxyProvider.java
@@ -103,7 +103,6 @@ public HadoopRpcOMFollowerReadFailoverProxyProvider(
         new HadoopRpcOMFailoverProxyProvider<>(configuration, ugi, omServiceId, protocol));
   }

-  @SuppressWarnings("unchecked")
   public HadoopRpcOMFollowerReadFailoverProxyProvider(String omServiceId, Class<T> protocol,
       HadoopRpcOMFailoverProxyProvider<T> failoverProxy) throws IOException {
     this.protocolClass = protocol;
@@ -119,6 +118,7 @@ public HadoopRpcOMFollowerReadFailoverProxyProvider(String omServiceId, Class<T>
       combinedInfo.append(failoverProxy.getOMProxies().get(i).proxyInfo);
     }
     combinedInfo.append(']');
+    @SuppressWarnings("unchecked")
     T wrappedProxy = (T) Proxy.newProxyInstance(
         FollowerReadInvocationHandler.class.getClassLoader(),
         new Class<?>[] {protocol}, new FollowerReadInvocationHandler());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

Comment on lines 114 to 121
StringBuilder combinedInfo = new StringBuilder("[");
for (int i = 0; i < failoverProxy.getOMProxies().size(); i++) {
if (i > 0) {
combinedInfo.append(',');
}
combinedInfo.append(failoverProxy.getOMProxies().get(i).proxyInfo);
}
combinedInfo.append(']');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may use "map/reduce".

    final String combinedInfo = "[" + failoverProxy.getOMProxies().stream()
        .map(a -> a.proxyInfo)
        .reduce((a, b) -> a + ", " + b).orElse("") + "]";

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

}

@SuppressWarnings("unchecked")
public HadoopRpcOMFollowerReadFailoverProxyProvider(String omServiceId, Class<T> protocol,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For simplicity, let support only OzoneManagerProtocolPB for now? Then, we can remove the followerReadEnabled and simplify the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. Could you clarify on this?

Currently there are guards to ensure that follower read only supports OzoneManagerProtocolPB. One is during construction by disabling useFollowerRead if it's not OzoneManagerProtocolPB. Second is during FollowerReadInvocationHandler.

Do you mean to make HadoopRpcOMFollowerReadFailoverProxyProvider to be HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB> implements FailoverProxy<OzoneManagerProtocolPB>? I believe this might mean that we need to replace T generic type parameter with OzoneManagerProtocolPB. This adds verbosity and reduces the generic benefit, not sure if this is a good tradeoff for removing userFollowerRead.

Please let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the case that it uses follower read only if protocol == OzoneManagerProtocolPB.class? If yes, we don't pass the protocol parameter.

Copy link
Contributor Author

@ivandika3 ivandika3 Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I understood. Since HadoopRpcOMFollowerReadFailoverProxyProvider FollowerReadInvocationhandler handles only submitRequest from OzoneManagerProtocolPB, we should simply make it to only handle OzoneManagerProtocolPB.

Comment on lines 168 to 179
private static OMRequest parseOMRequest(Object[] args) throws Throwable {
if (args == null || args.length < 2 || !(args[1] instanceof Message)) {
LOG.error("Request failed since OM request is null and cannot be parsed");
// Throws a non-retriable exception to prevent retry and failover
// See the HddsUtils#shouldNotFailoverOnRpcException used in
// OMFailoverProxyProviderBase#shouldFailover
throw wrapInServiceException(
new RpcNoSuchProtocolException("OM request is null and cannot be parsed"));
}
final Message theRequest = (Message) args[1];
return (OMRequest) theRequest;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's provide a more specific error message:

  private static OMRequest parseOMRequest(Object[] args) throws ServiceException {
    final String error = args == null ? "args == null"
        : args.length < 2 ? "args.length == " + args.length + " < 2"
        : !(args[1] instanceof OMRequest) ? "Non-OMRequest: " + args[1].getClass()
        : null;
    if (error != null) {
      // Throws a non-retriable exception to prevent retry and failover
      // See the HddsUtils#shouldNotFailoverOnRpcException used in
      // OMFailoverProxyProviderBase#shouldFailover
      throw wrapInServiceException(new RpcNoSuchProtocolException("Failed to parseOMRequest: " + error));
    }
    return (OMRequest) args[1];
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated this to handle null args[1]. I also changed this to use normal if else blocks since I personally find chained ternary operators hard to read.

public Object invoke(Object proxy, final Method method, final Object[] args)
throws Throwable {
lastProxy = null;
if (method.getDeclaringClass() == Object.class) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check (method.getDeclaringClass() != OzoneManagerProtocolPB.class) instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, better. Updated, thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted this due to failing test.

The actual Method#getDeclaringClass is org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos$OzoneManagerService$BlockingInterface so all submitRequest will go to this block.

Copy link
Contributor Author

@ivandika3 ivandika3 Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, I tried to change the proxy to be pass through if it's not OzoneManagerProtocolPB.class

From

return method.invoke(this, args);

to

return method.invoke(proxy, args);

However, it seems because org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos$OzoneManagerService$BlockingInterface is not concrete class, it throws exception when usineg Object#toString.

Caused by: java.lang.NoSuchMethodException: org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB.toString()

Not sure if there is a way to handle all cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for trying it! Let's keep using Object.class.

Comment on lines 407 to 412
private static Throwable wrapInServiceException(Throwable e) {
if (e instanceof ServiceException) {
return e;
}
return new ServiceException(e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throws ServiceException.

  private static void throwServiceException(Throwable e) throws ServiceException {
    throw e instanceof ServiceException ? (ServiceException) e : new ServiceException(e);
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated. However, retVal needs to be initialized to null since the compiler cannot detect that throwServiceException always throws exception (since it might think that it simply returns). So I have initialized retVal to null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, we may return ServiceException. Anyway, both are about the same.

  private static ServiceException getServiceException(Throwable e) {
    return e instanceof ServiceException ? (ServiceException) e : new ServiceException(e);
  }

*
* @return parsed OM request.
*/
private static OMRequest parseOMRequest(Object[] args) throws Throwable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throws ServiceException instead of Throwable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

* Whether reading from follower is enabled. If this is false, all read
* requests will still go to OM leader.
*/
private volatile boolean followerReadEnabled;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename it to useFollowerRead to avoid confusion with the conf.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

OMRequest omRequest = parseOMRequest(args);
if (followerReadEnabled && OmUtils.shouldSendToFollower(omRequest)) {
int failedCount = 0;
for (int i = 0; i < failoverProxy.getOmNodesInOrder().size(); i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check followerReadEnabled: followerReadEnabled && i < failoverProxy.getOmNodesInOrder().size()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

Copy link
Contributor Author

@ivandika3 ivandika3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szetszwo Thanks for the review.

Comment on lines +126 to 127
protected synchronized ProxyInfo<T> createOMProxyIfNeeded(OMProxyInfo<T> omProxyInfo) {
if (omProxyInfo.proxy == null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok will wait until HDDS-14470 is merged.

* Whether reading from follower is enabled. If this is false, all read
* requests will still go to OM leader.
*/
private volatile boolean followerReadEnabled;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

new HadoopRpcOMFailoverProxyProvider<>(configuration, ugi, omServiceId, protocol));
}

@SuppressWarnings("unchecked")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

Comment on lines 114 to 121
StringBuilder combinedInfo = new StringBuilder("[");
for (int i = 0; i < failoverProxy.getOMProxies().size(); i++) {
if (i > 0) {
combinedInfo.append(',');
}
combinedInfo.append(failoverProxy.getOMProxies().get(i).proxyInfo);
}
combinedInfo.append(']');
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

*
* @return parsed OM request.
*/
private static OMRequest parseOMRequest(Object[] args) throws Throwable {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

Comment on lines 168 to 179
private static OMRequest parseOMRequest(Object[] args) throws Throwable {
if (args == null || args.length < 2 || !(args[1] instanceof Message)) {
LOG.error("Request failed since OM request is null and cannot be parsed");
// Throws a non-retriable exception to prevent retry and failover
// See the HddsUtils#shouldNotFailoverOnRpcException used in
// OMFailoverProxyProviderBase#shouldFailover
throw wrapInServiceException(
new RpcNoSuchProtocolException("OM request is null and cannot be parsed"));
}
final Message theRequest = (Message) args[1];
return (OMRequest) theRequest;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

public Object invoke(Object proxy, final Method method, final Object[] args)
throws Throwable {
lastProxy = null;
if (method.getDeclaringClass() == Object.class) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, better. Updated, thanks.

OMRequest omRequest = parseOMRequest(args);
if (followerReadEnabled && OmUtils.shouldSendToFollower(omRequest)) {
int failedCount = 0;
for (int i = 0; i < failoverProxy.getOmNodesInOrder().size(); i++) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

Comment on lines 407 to 412
private static Throwable wrapInServiceException(Throwable e) {
if (e instanceof ServiceException) {
return e;
}
return new ServiceException(e);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated. However, retVal needs to be initialized to null since the compiler cannot detect that throwServiceException always throws exception (since it might think that it simply returns). So I have initialized retVal to null.

}

@SuppressWarnings("unchecked")
public HadoopRpcOMFollowerReadFailoverProxyProvider(String omServiceId, Class<T> protocol,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. Could you clarify on this?

Currently there are guards to ensure that follower read only supports OzoneManagerProtocolPB. One is during construction by disabling useFollowerRead if it's not OzoneManagerProtocolPB. Second is during FollowerReadInvocationHandler.

Do you mean to make HadoopRpcOMFollowerReadFailoverProxyProvider to be HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB> implements FailoverProxy<OzoneManagerProtocolPB>? I believe this might mean that we need to replace T generic type parameter with OzoneManagerProtocolPB. This adds verbosity and reduces the generic benefit, not sure if this is a good tradeoff for removing userFollowerRead.

Please let me know what you think.

# Conflicts:
#	hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
#	hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmTestUtil.java
#	hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
#	hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
Copy link
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 the change looks good.

Comment on lines +126 to 127
protected synchronized ProxyInfo<T> createOMProxyIfNeeded(OMProxyInfo<T> omProxyInfo) {
if (omProxyInfo.proxy == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let merge this first if it is ready. I can update the change for HDDS-14470.

}

@SuppressWarnings("unchecked")
public HadoopRpcOMFollowerReadFailoverProxyProvider(String omServiceId, Class<T> protocol,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the case that it uses follower read only if protocol == OzoneManagerProtocolPB.class? If yes, we don't pass the protocol parameter.

public Object invoke(Object proxy, final Method method, final Object[] args)
throws Throwable {
lastProxy = null;
if (method.getDeclaringClass() == Object.class) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for trying it! Let's keep using Object.class.

@szetszwo szetszwo merged commit a698f48 into apache:master Jan 22, 2026
44 checks passed
@ivandika3
Copy link
Contributor Author

Thanks @szetszwo for the in-depth reviews.

@greenwich
Copy link
Contributor

Hi, thanks for CC-ing me. Great MR! Thank you!

Unfortunately, I am late for the review. A few more thoughts going forward:

  1. Add OMFollowerReadMetrics to track the follower read-specific metrics to monitor its effectiveness, health issues, and measure performance.
  2. As you mentioned in the comments, using the simple round-robin routing without checking node roles might not be ideal. Since OM already exposes OmRoleInfo via ServiceInfo, we can leverage it.
  • We can either cache role information (periodically refresh OM roles in the background),
  • or check the role per request
    Both approaches have their own pros and cons.
  1. Read after write consistency. Let's imagine the following scenario:
  • Client writes/updates "abc" to OM leader (index 10)
  • Same client reads key "abc" from follower (index 9)
  • Client gets "key not found" error or inconsistent data.
    Does the client need to track the applied index? (This may require unit/integration test first).
  1. Good to see the performance benchmarks about read throughput, latency, etc.

@ivandika3
Copy link
Contributor Author

ivandika3 commented Jan 23, 2026

Thanks @greenwich for taking a look at this

Add OMFollowerReadMetrics to track the follower read-specific metrics to monitor its effectiveness, health issues, and measure performance.

We have client-side metrics (e.g. S3G metrics) to see the performance. Of course, we can add more if needed.

As you mentioned in the comments, using the simple round-robin routing without checking node roles might not be ideal. Since OM already exposes OmRoleInfo via ServiceInfo, we can leverage it.

Yes, this is the planned improvements. As you said, there are two possible implementations (each with its own pros and cons)

  1. We can periodically refresh OM roles in the background and cache the OM roles
    • Pros: The OM role refresh is not in the read critical path and will not introduce latency increase
    • Cons
      • The cached OM roles can be stale depending on the background service interval and the latency
      • Adding background service might send unnecessary RPCs for idle client (higher number of clients will generate a lot of these RPCs). Ideally, we only need to send the RPCs when we actually need it.
      • We also need to decide how do we check the OM service status: Do we want to send getServiceList to the leader (which has a more complete view of the Raft group) or we send getRoleInfo from each of the OM nodes (which might have a more detailed information)
  2. We can send an checkRole RPC per request or after every failover
    • Pros: Most up-to-date data
    • Cons
      • Higher latency due to the additional RPC
      • Higher number of RPCs (2x the number of read requests)

Due to this, we need to implement and benchmark to find the correct tradeoff or better solution.

Personally, I wonder whether we can implement ones that do not need to have these extra RPCs. Maybe we can let the follower to simply send request and change proxy if it feels that the current proxy is not performant enough. Or OM leader can have a mechanism to tell client to failover.

Read after write consistency.

This should be transparent to clients and guaranteed by the Ratis linearizable read using the Raft ReadIndex protocol, so client does not need to have a custom logic to handle this. I have added TestOzoneManagerHAFollowerReadWithAllRunning#testLinearizableReadConsistency to test the consistency.

I tried to implement similar read-your-own-write implementation inspired by the HDFS observer read which makes the client and server to exchanges the appliedIndex (https://issues.apache.org/jira/browse/HDDS-13933). However, it is not good enough for our use case since unless we call msync before every call, client can still read stale data. Additionally S3 requests from a single client can be redirected to the S3G pods which does not have any knowledge of the previous S3 request from the client. This means that if the S3G pod simply reads from the OM follower, it might be served stale data. The msync idea is similar to Raft ReadIndex, but instead of client talking asking the leader to the latest commit/applied index, Raft ReadIndex mechanism will make the follower asks for the leader for the latest commit/applied index (i.e. ReadIndex).

Good to see the performance benchmarks about read throughput, latency, etc.

I'm working on this. Will share once the benchmarks result are out.

@greenwich If you are interested, please help us to benchmark from your side or suggest improvements.

@szetszwo
Copy link
Contributor

@greenwich , thanks for the suggested improvements! We should continue working on the improvements.

BTW, I also have submitted #9661 for improving the data structures.

@greenwich
Copy link
Contributor

@ivandika3, sure, benchmarking sounds interesting.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants