KAFKA-15853: [1/N] Move static methods, SocketServer and Log configs to AbstractKafkaConfig#21548
Conversation
|
@clolov Hello. I will re-assign ticket to you. Thanks for picking this up. That's the big one :) |
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
14e269d to
91504d9
Compare
|
we need to fix the failed |
| return getInt(ServerLogConfigs.NUM_PARTITIONS_CONFIG); | ||
| } | ||
|
|
||
| public Integer logSegmentBytes() { |
There was a problem hiding this comment.
Should we return int instead of Integer here? IIRC, this config has a default value defined in ConfigDef, so it won't be null
There was a problem hiding this comment.
I did this boxing (and the other ones you pointed out) to satisfy the compiler's the result type of an implicit conversion must be more specific than Object in KafkaConfig. If you swap this to int you should be able to see what I mean 😊. Once everything is in Java, I believe we ought to be able to revert back to primitive types.
There was a problem hiding this comment.
> Task :core:compileScala
[Error] /home/chia7712/project/kafka/core/src/main/scala/kafka/server/KafkaConfig.scala:580:52: the result type of an implicit conversion must be more specific than Object
one error found
you are right 😢
| return getInt(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG); | ||
| } | ||
|
|
||
| public Long logFlushIntervalMessages() { |
| return getInt(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG); | ||
| } | ||
|
|
||
| public Map<String, Integer> maxConnectionsPerIpOverrides() { |
There was a problem hiding this comment.
We could leverage lambda style.
public Map<String, Integer> maxConnectionsPerIpOverrides() {
return getMap(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG,
getString(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG))
.entrySet()
.stream()
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> Integer.parseInt(e.getValue())));
}| * @return the type if found, or empty | ||
| */ | ||
| public static Optional<ConfigDef.Type> configTypeExact(String exactName) { | ||
| Optional<ConfigDef.Type> t = configDefTypeOf(exactName); |
There was a problem hiding this comment.
we could refactor configTypeExact, configType, and configType
public static Optional<ConfigDef.Type> configDefTypeOf(String name) {
return Optional.ofNullable(CONFIG_DEF.configKeys().get(name))
.map(key -> key.type);
}
public static Optional<ConfigDef.Type> configType(String configName) {
return configDefTypeOf(configName)
.or(() -> Optional.ofNullable(DynamicConfig.Broker.configKeys().get(configName))
.map(key -> key.type))
.or(() -> DynamicBrokerConfig.brokerConfigSynonyms(configName, true)
.stream()
.map(AbstractKafkaConfig::configDefTypeOf)
.flatMap(Optional::stream)
.findFirst());
}| import java.util.Set; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import static org.apache.kafka.common.config.ConfigResource.Type.BROKER; |
| public Long logRollTimeJitterMillis() { | ||
| Long millis = getLong(ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG); | ||
| if (millis != null) return millis; | ||
| return 60L * 60L * 1000L * getInt(ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG); |
There was a problem hiding this comment.
Maybe we could use TimeUnit instead?
return TimeUnit.HOURS.toMillis(getInt(ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG));| return getLong(ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_CONFIG); | ||
| } | ||
|
|
||
| public Long logRetentionTimeMillis() { |
There was a problem hiding this comment.
public Long logRetentionTimeMillis() {
Long millis = getLong(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG);
if (millis == null) {
Integer mins = getInt(ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG);
if (mins != null) {
millis = TimeUnit.MINUTES.toMillis(mins);
} else {
millis = TimeUnit.HOURS.toMillis(getInt(ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG));
}
}
return millis < 0 ? -1L : millis;
}There was a problem hiding this comment.
For this one, I have changed to use TimeUnit, but the last line needs to be return millis < 0 ? Long.valueOf(-1) : millis;. Otherwise spotbugs complains with
Boxed value is unboxed and then immediately reboxed in org.apache.kafka.server.config.AbstractKafkaConfig.logRetentionTimeMillis()
This is one of those boxing/unboxing issues which ought to disappear once we move fully to Java
|
|
||
| @volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp | ||
| @volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.map { case (host, count) => (InetAddress.getByName(host), count) } | ||
| @volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.asScala.map { case (host, count) => (InetAddress.getByName(host), count.intValue()) }.toMap |
There was a problem hiding this comment.
It might be better to use a Java Map for maxConnectionsPerIpOverrides, but it's a minor nitpick. Happy to leave it for a follow-up
…to AbstractKafkaConfig (apache#21548) This is a gradual migration of KafkaConfig to Java for easier reviews. Moved the static methods to AbstractKafkaConfig and the methods used to access SocketServer and Log configs. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
…to AbstractKafkaConfig (apache#21548) This is a gradual migration of KafkaConfig to Java for easier reviews. Moved the static methods to AbstractKafkaConfig and the methods used to access SocketServer and Log configs. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This is a gradual migration of KafkaConfig to Java for easier reviews.
Moved the static methods to AbstractKafkaConfig and the methods used to
access SocketServer and Log configs.
Reviewers: Chia-Ping Tsai chia7712@gmail.com