Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.pinot.core.query.utils.rewriter.ResultRewriterFactory;
import org.apache.pinot.core.routing.MultiClusterRoutingContext;
import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.core.transport.NettyInspector;
import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.core.util.trace.ContinuousJfrStarter;
Expand Down Expand Up @@ -218,6 +219,8 @@ public void init(PinotConfiguration brokerConf)
Helix.PREFIX_OF_BROKER_INSTANCE, _instanceId);

_brokerConf.setProperty(Broker.CONFIG_OF_BROKER_ID, _instanceId);

NettyInspector.logAllChecks();
}

/// Can be overridden to apply custom configs to the broker conf.
Expand Down Expand Up @@ -523,6 +526,8 @@ public void start()

_defaultClusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE);

NettyInspector.registerMetrics(_brokerMetrics);

LOGGER.info("Finish starting Pinot broker");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.transport;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.pinot.common.metrics.AbstractMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/// Utility class to inspect Netty constants and log their values, with the ability to check for specific conditions
/// and log warnings if they are not met.
public class NettyInspector {
private static final Logger LOGGER = LoggerFactory.getLogger(NettyInspector.class);
/// We use a CopyOnWriteArrayList to allow dynamic addition of checks at runtime, if needed.
public static final CopyOnWriteArrayList<Check> CHECKS;
/// We use a CopyOnWriteArrayList to allow dynamic addition of Netty instances at runtime,
/// if needed (e.g. if we want to support other shaded versions of Netty).
public static final CopyOnWriteArrayList<NettyInstance> KNOWN_INSTANCES;
Comment on lines +37 to +40
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get why these need to be copy on write arrays? They aren't mutated after creation, no?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Third-party distributions may include their own Netty copies as well. In StarTree, we don't do that, but I don't know about other distributors. A copy-on-write structure is thread-safe and very cheap to read (it incurs a cost when writing). Given we mostly only going to use it to read from it (as you said, probably nobody is going to modify it later), I think this is the best solution for our use case.


static {
CHECKS = new CopyOnWriteArrayList<>(
new Check[] {
NettyInspector::checkDirectMemory
}
);
KNOWN_INSTANCES = new CopyOnWriteArrayList<>(new NettyInstance[] {
new NettyInstance.UnshadedNettyInstance(),
new NettyInstance.GrpcNettyInstance()
});
}

private NettyInspector() {
// Private constructor to prevent instantiation
}

public static void registerMetrics(AbstractMetrics<?, ?, ?, ?> metrics) {
for (NettyInstance instance : KNOWN_INSTANCES) {
metrics.setOrUpdateGauge(instance.getName() + "NettyDirectMemoryUsed",
instance::getUsedDirectMemory);
metrics.setOrUpdateGauge(instance.getName() + "NettyDirectMemoryMax",
instance::getMaxDirectMemory);
}
}

/// Logs the values of all constants for all Netty instances, and logs warnings if any checks fail.
public static void logAllChecks() {
for (Map.Entry<NettyInstance, List<CheckResult>> entry : checkAllConstants().entrySet()) {
NettyInstance instance = entry.getKey();
List<CheckResult> results = entry.getValue();
for (CheckResult result : results) {
switch (result._status) {
case PASS:
// Do nothing
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is only being called once on startup, maybe we could include an info log?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be useful to expose this check as a REST endpoint so we can call it periodically in the future.

break;
case FAIL:
LOGGER.warn("Netty instance '{}' check failed: {}", instance.getName(), result._message);
break;
case UNKNOWN:
LOGGER.warn("Netty instance '{}' check unknown: {}", instance.getName(), result._message);
break;
default:
LOGGER.warn("Netty instance '{}' check returned unexpected status: {}", instance.getName(), result._status);
break;
}
}
}
}

/// Checks all constants for all Netty instances and returns a map of instances to their check results.
public static Map<NettyInstance, List<CheckResult>> checkAllConstants() {
Map<NettyInstance, List<CheckResult>> results = new HashMap<>();
for (NettyInstance instance : KNOWN_INSTANCES) {
for (Check check : CHECKS) {
CheckResult result = check.apply(instance);
results.computeIfAbsent(instance, k -> new ArrayList<>()).add(result);
}
}
return results;
}

public static CheckResult checkDirectMemory(NettyInstance instance) {
if (instance.isExplicitTryReflectionSetAccessible()) {
return CheckResult.SUCCESS;
} else {
String message = "Reflection access is disabled on the " + instance.getName() + " Netty instance. "
+ "Netty will probably use heap memory instead off-heap. "
+ "It is recommended to set -D" + instance.getShadePrefix()
+ "io.netty.tryReflectionSetAccessible=true.";
return new CheckResult(message, CheckResult.Status.FAIL);
}
}

public interface Check extends Function<NettyInstance, CheckResult> {
/// Applies the check to the given Netty instance.
/// @param nettyInstance Netty instance to check
/// @return CheckResult indicating success, failure or unknown
@Override
CheckResult apply(NettyInstance nettyInstance);
}

public static class CheckResult {
public static final CheckResult SUCCESS = new CheckResult(null, Status.PASS);
@Nullable
private final String _message;
private final Status _status;

public CheckResult(@Nullable String message, Status status) {
_message = message;
_status = status;
}

public enum Status {
PASS,
FAIL,
UNKNOWN
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pinot.core.transport;

import java.lang.reflect.Constructor;

/// Represents an instance of Netty, allowing access to certain static properties via reflection, with support for
/// shaded Netty versions.
///
/// We know 2 common Netty instances:
/// - Unshaded Netty, which uses the standard `io.netty` package
/// - gRPC-shaded Netty, shaded by gRPC and included as a dependency. It uses `io.grpc.netty.shaded.io.netty` package.
///
/// This is important because Netty defines is not designed to be shaded, and it uses some static attributes to
/// determine its behavior, specially whether it can use `Unsafe` or not or how much memory to allocate for direct
/// buffers.
/// These attributes are set using JAVA_OPTs. Each shaded version uses different JAVA_OPT properties. If we forget to
/// set one of these properties for a shaded version, that Netty _instance_ will fall back to some default behavior that
/// may not be optimal, and we won't have any indication of that happening.
///
/// Given we do not shade Netty our-selves, we can access the different copies of the Netty classes without using
/// reflection. [NettyInstance] provides an abstraction to access the different Netty instances and their properties in
/// a simple way.
///
/// **It is critical to not shade this class**, otherwise the literals used for reflection
/// (ie `io.netty.util.internal.PlatformDependent`) will be shaded too, so instead of looking for
/// `io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent`, as you may think reading this class, the shaded
/// version of this class will look for
/// `io.grpc.netty.shaded.org.apache.pinot.shaded.io.netty.util.internal.PlatformDependent`.
/// At the moment this is written, Pinot _does not_ shade Netty, so it is safe. Just to be sure, this class should be
/// excluded in the maven shade plugin configuration (see pom.xml on the root of the project).
public abstract class NettyInstance {
private static final Constructor<DummyClass> CONSTRUCTOR;

static {
try {
CONSTRUCTOR = DummyClass.class.getConstructor();
} catch (NoSuchMethodException e) {
throw new RuntimeException("This should never happen, DummyClass has a default constructor", e);
}
}

/// The name of the Netty instance. It will be used on logs but also on metric names, so it should be
/// something short and without spaces like "Unshaded", "Pinot", "gRPC".
/// Add underscores if you need to separate words, but avoid other special characters.
public abstract String getName();

public abstract String getShadePrefix();

public abstract boolean isExplicitTryReflectionSetAccessible();

public abstract long getUsedDirectMemory();

public abstract long getMaxDirectMemory();

public static class UnshadedNettyInstance extends NettyInstance {
@Override
public String getName() {
return "unshaded";
}

@Override
public String getShadePrefix() {
return "";
}

@Override
public boolean isExplicitTryReflectionSetAccessible() {
return io.netty.util.internal.ReflectionUtil.trySetAccessible(CONSTRUCTOR, true) == null;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still tricky: instead of calling a getter (which is private in Netty), we call the method they use to check whether they can call the private constructors of ByteBuffers. As long as Netty keeps using the same code (which seems very likely), this should be a safe, reflection-free way to know whether Netty can use offheap buffers or not.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a neat hack lol, but there's no way we can be sure that Netty will continue using the same method to decide whether offheap buffers can be used in future versions right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, but I think that is very unlikely. I can imagine in the future Netty may create an alternative that uses the new Arenas introduced in foreign function and memory (FFM). In that case, they may change this code, but I don't think they are going to do so soon.

Anyway, the alternative is to use reflection, and that is even less stable.

}

@Override
public long getUsedDirectMemory() {
return io.netty.util.internal.PlatformDependent.usedDirectMemory();
}

@Override
public long getMaxDirectMemory() {
return io.netty.util.internal.PlatformDependent.maxDirectMemory();
}
}

public static class GrpcNettyInstance extends NettyInstance {
@Override
public String getName() {
return "gRPC";
}

@Override
public String getShadePrefix() {
return "io.grpc.netty.shaded.";
}

@Override
public boolean isExplicitTryReflectionSetAccessible() {
return io.grpc.netty.shaded.io.netty.util.internal.ReflectionUtil.trySetAccessible(CONSTRUCTOR, true) == null;
}

@Override
public long getUsedDirectMemory() {
return io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.usedDirectMemory();
}

@Override
public long getMaxDirectMemory() {
return io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.maxDirectMemory();
}
}

private static class DummyClass {
public DummyClass() {
// This does nothing, but it is used to test if we can set accessible to a class that is not related to Netty,
// which is what we do in the isExplicitTryReflectionSetAccessible() method of NettyInspector.
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.pinot.core.instance.context.ServerContext;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.core.transport.NettyInspector;
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.core.util.trace.ContinuousJfrStarter;
import org.apache.pinot.query.runtime.operator.factory.DefaultQueryOperatorFactoryProvider;
Expand Down Expand Up @@ -294,6 +295,8 @@ public void init(PinotConfiguration serverConf)

_clusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE);
initTransitionThreadPoolManager();

NettyInspector.logAllChecks();
}

/**
Expand Down Expand Up @@ -905,6 +908,8 @@ public void start()
} else {
_serverMetrics.addTimedValue(ServerTimer.STARTUP_FAILURE_DURATION_MS, startupDurationMs, TimeUnit.MILLISECONDS);
}

NettyInspector.registerMetrics(_serverMetrics);
}

protected SegmentMultiColTextIndexPreprocessThrottler createMultiColumnIndexPreprocessThrottler() {
Expand Down
3 changes: 3 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2929,6 +2929,9 @@

<!-- Solve NoClassDefFoundError. Borrowed from https://github.com/prometheus/jmx_exporter/issues/802 -->
<exclude>META-INF/versions/9/org/yaml/snakeyaml/internal/**</exclude>

<!-- Exclude NettyInstance because it includes Netty package literals -->
<exclude>src/main/java/org/apache/pinot/core/transport/NettyInstance.java</exclude>
</excludes>
</filter>
</filters>
Expand Down
Loading