Skip to content

KAFKA-15853: [1/N] Move static methods, SocketServer and Log configs to AbstractKafkaConfig#21548

Merged
chia7712 merged 7 commits intoapache:trunkfrom
clolov:kafka-15853-draft
Mar 27, 2026
Merged

KAFKA-15853: [1/N] Move static methods, SocketServer and Log configs to AbstractKafkaConfig#21548
chia7712 merged 7 commits intoapache:trunkfrom
clolov:kafka-15853-draft

Conversation

@clolov
Copy link
Copy Markdown
Contributor

@clolov clolov commented Feb 23, 2026

This is a gradual migration of KafkaConfig to Java for easier reviews.

Moved the static methods to AbstractKafkaConfig and the methods used to
access SocketServer and Log configs.

Reviewers: Chia-Ping Tsai chia7712@gmail.com

@github-actions github-actions Bot added triage PRs from the community core Kafka Broker labels Feb 23, 2026
@UladzislauBlok
Copy link
Copy Markdown
Contributor

@clolov Hello. I will re-assign ticket to you. Thanks for picking this up. That's the big one :)

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Mar 3, 2026

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

Comment thread server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java Outdated
@github-actions github-actions Bot removed needs-attention triage PRs from the community labels Mar 6, 2026
@clolov clolov force-pushed the kafka-15853-draft branch from 14e269d to 91504d9 Compare March 18, 2026 13:56
@clolov clolov changed the title KAFKA-15853: Move static methods and SocketServer configs to AbstractKafkaConfig KAFKA-15853: [1/N] Move static methods, SocketServer and Log configs to AbstractKafkaConfig Mar 18, 2026
@clolov clolov marked this pull request as ready for review March 18, 2026 14:01
@chia7712
Copy link
Copy Markdown
Member

we need to fix the failed testSpecificProperties by making following change.

- assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3), config.maxConnectionsPerIpOverrides)
+ assertEquals(util.Map.of("127.0.0.1", 2, "127.0.0.2", 3), config.maxConnectionsPerIpOverrides)

return getInt(ServerLogConfigs.NUM_PARTITIONS_CONFIG);
}

public Integer logSegmentBytes() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we return int instead of Integer here? IIRC, this config has a default value defined in ConfigDef, so it won't be null

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this boxing (and the other ones you pointed out) to satisfy the compiler's the result type of an implicit conversion must be more specific than Object in KafkaConfig. If you swap this to int you should be able to see what I mean 😊. Once everything is in Java, I believe we ought to be able to revert back to primitive types.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

> Task :core:compileScala
[Error] /home/chia7712/project/kafka/core/src/main/scala/kafka/server/KafkaConfig.scala:580:52: the result type of an implicit conversion must be more specific than Object
one error found

you are right 😢

return getInt(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG);
}

public Long logFlushIntervalMessages() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

return getInt(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG);
}

public Map<String, Integer> maxConnectionsPerIpOverrides() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could leverage lambda style.

    public Map<String, Integer> maxConnectionsPerIpOverrides() {
        return getMap(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG,
                getString(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG))
                .entrySet()
                .stream()
                .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> Integer.parseInt(e.getValue())));
    }

* @return the type if found, or empty
*/
public static Optional<ConfigDef.Type> configTypeExact(String exactName) {
Optional<ConfigDef.Type> t = configDefTypeOf(exactName);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could refactor configTypeExact, configType, and configType

public static Optional<ConfigDef.Type> configDefTypeOf(String name) {
    return Optional.ofNullable(CONFIG_DEF.configKeys().get(name))
                   .map(key -> key.type);
}

public static Optional<ConfigDef.Type> configType(String configName) {
    return configDefTypeOf(configName)
            .or(() -> Optional.ofNullable(DynamicConfig.Broker.configKeys().get(configName))
                              .map(key -> key.type))
            .or(() -> DynamicBrokerConfig.brokerConfigSynonyms(configName, true)
                              .stream()
                              .map(AbstractKafkaConfig::configDefTypeOf)
                              .flatMap(Optional::stream)
                              .findFirst());
}

import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is unnecessary

public Long logRollTimeJitterMillis() {
Long millis = getLong(ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG);
if (millis != null) return millis;
return 60L * 60L * 1000L * getInt(ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could use TimeUnit instead?

return TimeUnit.HOURS.toMillis(getInt(ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG));

return getLong(ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_CONFIG);
}

public Long logRetentionTimeMillis() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    public Long logRetentionTimeMillis() {
        Long millis = getLong(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG);
        if (millis == null) {
            Integer mins = getInt(ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG);
            if (mins != null) {
                millis = TimeUnit.MINUTES.toMillis(mins);
            } else {
                millis = TimeUnit.HOURS.toMillis(getInt(ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG));
            }
        }
        return millis < 0 ? -1L : millis;
    }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this one, I have changed to use TimeUnit, but the last line needs to be return millis < 0 ? Long.valueOf(-1) : millis;. Otherwise spotbugs complains with

Boxed value is unboxed and then immediately reboxed in org.apache.kafka.server.config.AbstractKafkaConfig.logRetentionTimeMillis()

This is one of those boxing/unboxing issues which ought to disappear once we move fully to Java


@volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp
@volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.map { case (host, count) => (InetAddress.getByName(host), count) }
@volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.asScala.map { case (host, count) => (InetAddress.getByName(host), count.intValue()) }.toMap
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to use a Java Map for maxConnectionsPerIpOverrides, but it's a minor nitpick. Happy to leave it for a follow-up

@chia7712 chia7712 merged commit 58b56c2 into apache:trunk Mar 27, 2026
24 checks passed
Shekharrajak pushed a commit to Shekharrajak/kafka that referenced this pull request Mar 31, 2026
…to AbstractKafkaConfig (apache#21548)

This is a gradual migration of KafkaConfig to Java for easier reviews.

Moved the static methods to AbstractKafkaConfig and the methods used to
access SocketServer and Log configs.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
nileshkumar3 pushed a commit to nileshkumar3/kafka that referenced this pull request Apr 15, 2026
…to AbstractKafkaConfig (apache#21548)

This is a gradual migration of KafkaConfig to Java for easier reviews.

Moved the static methods to AbstractKafkaConfig and the methods used to
access SocketServer and Log configs.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Kafka Broker

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants