KAFKA-20251: Add group-level configs for assignment batching and offload#21627
KAFKA-20251: Add group-level configs for assignment batching and offload#21627dajac merged 2 commits intoapache:trunkfrom
Conversation
| public final int shareAssignmentIntervalMs; | ||
|
|
||
| public final Optional<Boolean> shareAssignorOffloadEnable; |
There was a problem hiding this comment.
These group-level configs were tricky. We need a special value to denote "use the group coordinator config". For ints, that is -1. For booleans, we have to use null or make the configs strings.
There was a problem hiding this comment.
I was thinking a bit more about this! In other configurations (and correct me if there is a precedent for this), we tend to treat a presence at a more-granular level as an override of the coarser level. As such, if I don't want an override at a group-level in this case, I would omit setting the configuration rather than set it as -1. What do you think?
There was a problem hiding this comment.
I would much prefer to default to null and not allow -1. However there's no precedent for having a default of null for integer or boolean configs anywhere.
If I set the default for the assignment.interval.ms configs to null, I get a startup error.
Caused by:
java.lang.ExceptionInInitializerError: Exception org.apache.kafka.common.config.ConfigException: Invalid value null for configuration consumer.assignment.interval.ms: Value must be non-null [in thread "Test worker"]
at org.apache.kafka.common.config.ConfigDef$Range.ensureValid(ConfigDef.java:1008)
at org.apache.kafka.common.config.ConfigDef$ConfigKey.<init>(ConfigDef.java:1342)
at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:158)
at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:201)
at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:240)
at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:402)
at org.apache.kafka.coordinator.group.GroupConfig.<clinit>(GroupConfig.java:144)
If I instead set the default to ConfigDef.NO_DEFAULT_VALUE, I get a different error.
org.apache.kafka.common.config.ConfigException: Missing required configuration "consumer.assignment.interval.ms" which has no default value.
So to make null work for the assignment.interval.ms configs, we would have to define an Optional.of validator or Null and Any.of validators . Do you think it's worth doing?
There was a problem hiding this comment.
Sorry, let me try to explain what I meant in a different way. You have 3 pairs of new configurations which can be set on both the group coordinator level and on the group level.
Here is a similar example for another configuration group.consumer.session.timeout.ms (group coordinator level https://kafka.apache.org/42/configuration/broker-configs/#brokerconfigs_group.consumer.session.timeout.ms) and consumer.session.timeout.ms (group level https://kafka.apache.org/42/configuration/group-configs/#groupconfigs_consumer.session.timeout.ms).
Instead of deciding to use -1 at the group level to fall through to the group coordinator level, the definition at the group level is:
...
.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
INT,
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, <-- We reference the default to be the group coordinator level default
atLeast(1),
MEDIUM,
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)
...
Do you reckon we can do something similar here?
There was a problem hiding this comment.
I see! I got something to work.
The group-level defaults must never be used for the new configs. If we use the default value (or the values returned by GroupCoordinatorConfig.extractGroupConfigMap), the defaults will be baked in once any unrelated group-level config is set. Then we won't pick up assignment.interval.ms changes at the group coordinator-level. It's fine for the other configs since their group coordinator-level configs aren't dynamic.
There was a problem hiding this comment.
Apologies for the delay, I took a few days off in an area with patchy internet 😊
I got the problem after staring at it for a bit more. Basically, the configurations I gave as an example fall through to the group coordinator value set when the broker started. You would like to have the "proper" expected behaviour where a dynamic group coordinator value is picked if there is no group value present. This makes perfect sense!
96bce12 to
7543f43
Compare
| .define(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, | ||
| INT, | ||
| GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT, | ||
| atLeast(-1), |
There was a problem hiding this comment.
I guess we can change it to atLeast(0) after using the coordinator default?
There was a problem hiding this comment.
I forgot to update those, thank you!
dongnuo123
left a comment
There was a problem hiding this comment.
Thanks for the PR! LGTM
dajac
left a comment
There was a problem hiding this comment.
@squah-confluent Thanks for the patch. I left a few comments/questions.
| .define(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING, CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT, ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)), MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC) | ||
| // Interval config used for testing purposes. | ||
| .defineInternal(CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_DEFAULT, atLeast(10 * 1000), MEDIUM, CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_DOC) | ||
| .define(CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DOC) |
There was a problem hiding this comment.
nit: Could you please put them before the internal ones?
There was a problem hiding this comment.
Reordered them
| .define(SHARE_GROUP_MAX_SIZE_CONFIG, INT, SHARE_GROUP_MAX_SIZE_DEFAULT, between(1, 1000), MEDIUM, SHARE_GROUP_MAX_SIZE_DOC) | ||
| .define(SHARE_GROUP_ASSIGNORS_CONFIG, LIST, SHARE_GROUP_ASSIGNORS_DEFAULT, ConfigDef.ValidList.anyNonDuplicateValues(false, false), MEDIUM, SHARE_GROUP_ASSIGNORS_DOC) | ||
| .defineInternal(SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DEFAULT, atLeast(1), LOW, SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DOC) | ||
| .define(SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_DOC) |
| require(consumerGroupHeartbeatIntervalMs < consumerGroupSessionTimeoutMs, | ||
| String.format("%s must be less than %s", CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG)); | ||
|
|
||
| require(consumerGroupMaxAssignmentIntervalMs >= consumerGroupMinAssignmentIntervalMs, |
There was a problem hiding this comment.
Are those min/max enforced when the configuration is update dynamically?
There was a problem hiding this comment.
Yes, kafka-configs.sh returns an error when the new dynamic config is out of range.
% ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type brokers --entity-default --add-config group.consumer.assignment.interval.ms=31000
Error while executing config command with args '--bootstrap-server localhost:9092 --alter --entity-type brokers --entity-default --add-config group.consumer.assignment.interval.ms=31000'
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidRequestException: group.consumer.assignment.interval.ms must be less than or equal to group.consumer.max.assignment.interval.ms
[2026-03-12 00:26:09,877] ERROR [ConfigAdminManager[nodeId=1]: validation of configProps {group.consumer.assignment.interval.ms=31000, min.insync.replicas=1} for ConfigResource(type=BROKER, name='') failed with exception (kafka.server.ConfigAdminManager)
org.apache.kafka.common.errors.InvalidRequestException: group.consumer.assignment.interval.ms must be less than or equal to group.consumer.max.assignment.interval.ms
[2026-03-12 00:26:09,878] ERROR [ConfigAdminManager[nodeId=1]: Error preprocessing incrementalAlterConfigs request on ConfigResource(type=BROKER, name='') (kafka.server.ConfigAdminManager)
org.apache.kafka.common.errors.InvalidRequestException: group.consumer.assignment.interval.ms must be less than or equal to group.consumer.max.assignment.interval.ms
On broker startup with an out-of-range dynamic override, we get an error.
Stack
[2026-03-12 00:27:43,088] ERROR Cluster default configs could not be applied: [group.consumer.assignment.interval.ms, min.insync.replicas] (kafka.server.DynamicBrokerConfig)
java.lang.IllegalArgumentException: group.consumer.assignment.interval.ms must be less than or equal to group.consumer.max.assignment.interval.ms
at org.apache.kafka.common.utils.Utils.require(Utils.java:1710)
at org.apache.kafka.coordinator.group.GroupCoordinatorConfig.<init>(GroupCoordinatorConfig.java:602)
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:190)
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:147)
at kafka.server.DynamicBrokerConfig.processReconfiguration(DynamicBrokerConfig.scala:446)
at kafka.server.DynamicBrokerConfig.updateCurrentConfig(DynamicBrokerConfig.scala:435)
at kafka.server.DynamicBrokerConfig.$anonfun$updateDefaultConfig$1(DynamicBrokerConfig.scala:292)
at org.apache.kafka.server.util.LockUtils.inLock(LockUtils.java:96)
at org.apache.kafka.server.util.LockUtils.inWriteLock(LockUtils.java:115)
at kafka.server.DynamicBrokerConfig.updateDefaultConfig(DynamicBrokerConfig.scala:287)
at kafka.server.BrokerConfigHandler.processConfigChanges(ConfigHandler.scala:150)
at kafka.server.metadata.DynamicConfigPublisher.$anonfun$onMetadataUpdate$5(DynamicConfigPublisher.scala:81)
at scala.Option.foreach(Option.scala:437)
at kafka.server.metadata.DynamicConfigPublisher.$anonfun$onMetadataUpdate$2(DynamicConfigPublisher.scala:74)
at java.base/java.util.HashMap$KeySet.forEach(HashMap.java:1016)
at kafka.server.metadata.DynamicConfigPublisher.$anonfun$onMetadataUpdate$1(DynamicConfigPublisher.scala:57)
at kafka.server.metadata.DynamicConfigPublisher.$anonfun$onMetadataUpdate$1$adapted(DynamicConfigPublisher.scala:56)
at scala.Option.foreach(Option.scala:437)
at kafka.server.metadata.DynamicConfigPublisher.onMetadataUpdate(DynamicConfigPublisher.scala:56)
at kafka.server.metadata.BrokerMetadataPublisher.onMetadataUpdate(BrokerMetadataPublisher.scala:190)
at org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:315)
at org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:272)
at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:134)
at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:217)
at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:188)
at java.base/java.lang.Thread.run(Thread.java:1583)
This prevents other dynamic configs from being applied, so I added clamping similar to #21633.
| this.consumerAssignmentIntervalMs = props.containsKey(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) ? | ||
| Optional.of(getInt(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG)) : | ||
| Optional.empty(); |
There was a problem hiding this comment.
I am a bit confused by this as we don't have it for consumerHeartbeatIntervalMs. Does it mean that the logic that we have for consumerHeartbeatIntervalMs is incorrect?
There was a problem hiding this comment.
I suppose that this is why you had to handle them differently, correct?
There was a problem hiding this comment.
Another issue seems to be that we capture the default from the broker when the group manager is created so changing the broker configs dynamically has no effect. Is it correct?
There was a problem hiding this comment.
The logic for consumerHeartbeatIntervalMs works correctly since it's not dynamic at the broker level. Since the new assignment.interval.ms configs are dynamic at the broker level, we must not capture any defaults or inherit values from the broker level here.
Another issue seems to be that we capture the default from the broker when the group manager is created so changing the broker configs dynamically has no effect. Is it correct?
Yes, this applies to the heartbeat interval configs. But since those configs aren't dynamic at the broker level, the code works correctly.
| public Map<String, Integer> extractConsumerGroupConfigMap() { | ||
| return Map.of( | ||
| GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, consumerGroupSessionTimeoutMs(), | ||
| GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, consumerGroupHeartbeatIntervalMs()); |
There was a problem hiding this comment.
Thanks for the PR! It seems like the newly added parameters are missing from the corresponding extract*ConfigMap methods (extractConsumerGroupConfigMap, extractShareGroupConfigMap, extractStreamsGroupConfigMap).
There was a problem hiding this comment.
It's intentional. The new group coordinator level configs are dynamic and we must not bake them in.
The new testDynamicBrokerAndGroupConfigs will fail if we do.
There was a problem hiding this comment.
Thanks for the explanation. While looking into this, I noticed a related issue in the validate path.
When a group-level config (e.g., share.assignment.interval.ms) is not explicitly set, the validate() method uses the hardcoded default value from ConfigDef (e.g., 1000) rather than the broker's current value.
Steps to reproduce:
- Configure
server.properties:
group.share.assignment.interval.ms=2000
group.share.min.assignment.interval.ms=2000
- Try to set a different group config (not
share.assignment.interval.ms):
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type groups --entity-name group-1 --add-config share.heartbeat.interval.ms=5058
- Error occurs:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidConfigurationException: share.assignment.interval.ms must be greater than or equal to group.share.min.assignment.interval.ms
There was a problem hiding this comment.
One possible approach could be adding a new method extractDynamicGroupConfigMap() in GroupCoordinatorConfig.
The existing extractGroupConfigMap() only includes static configs (e.g., session.timeout.ms), which is correct for GroupConfigManager defaults. However, validate() also needs dynamic configs (those in RECONFIGURABLE_CONFIGS, e.g., assignment.interval.ms) to validate against the broker's current values.
The new method could return current broker values for dynamic group configs, and validate() would merge both:
combinedConfigs.putAll(groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig));
combinedConfigs.putAll(groupCoordinatorConfig.extractDynamicGroupConfigMap());
combinedConfigs.putAll(props);This way, validation would use the broker's current dynamic values while keeping GroupConfigManager behavior unchanged.
What do you think? Happy to discuss other approaches as well.
There was a problem hiding this comment.
Looking further, there might be a more fundamental improvement worth discussing.
The root cause seems to be that GroupConfigManager stores a snapshot of defaults at construction time. These values are then captured in GroupConfig and won't reflect later broker config changes. This causes stale values and requires maintaining static/dynamic config distinctions.
Perhaps we could refactor GroupConfig to only store user-explicit values (all fields as Optional<T>), and have GroupConfigManager provide accessor methods with runtime fallback:
// In GroupConfigManager
public int shareGroupAssignmentIntervalMs(String groupId) {
return groupConfig(groupId)
.flatMap(GroupConfig::shareAssignmentIntervalMs)
.orElse(groupCoordinatorConfig.shareGroupAssignmentIntervalMs());
}This way, callers wouldn't need to be aware of the fallback logic - they simply call one method and get the effective value, whether from group-level config or broker-level config (static or dynamic).
If this approach sounds reasonable, I'd be happy to work on the implementation.
WDYT?
There was a problem hiding this comment.
The broker-level min and max assignment batching configs are static, so the valid range cannot change after the group configs are baked. Clamping is still applied to group configs at broker startup, to the current min and max range.
The inherited, dynamic broker-level assignment batching config cannot go out of range, since we reject out-of-range alter configs. The dynamic broker-level assignment batching config is also subject to clamping on broker startup, so groups cannot inherit out-of-range values.
There was a problem hiding this comment.
since we reject out-of-range alter configs.
If users alter dynamic broker configs via the controller, how does the controller reject out-of-range values during the RPC if it doesn't have access to the broker's static min/max bounds?
There was a problem hiding this comment.
The min/max configs must be deployed over there too.
There was a problem hiding this comment.
Just another perspective on GroupConfig: If we leave all members as Optional, the GroupConfig will not be a self-contained object, which compels us to rely on ShareGroupConfigProvider or other Manager classes to resolve the correct configurations.
I was also thinking about this. I think that we could consider handling the fallback within GroupConfig too. It would basically need a reference to the GroupCoordinatorConfig to achieve it. I don't have a strong preference for either ways to be honest.
There was a problem hiding this comment.
The min/max configs must be deployed over there too.
I might be missing something here. Do you mean operators must manually keep the static configurations (like min/max ranges) identical across both brokers and controllers to ensure validation works?
dajac
left a comment
There was a problem hiding this comment.
The end state is not really satisfying here. We end up with two ways of handling dynamic configurations. This is really confusing. I understand that the current approach is also confusing and it does not support dynamic broker configuration overrides because the default values are captured when the GroupConfig is created.
One possible way forward would be to adopt the pattern proposed by @squah-confluent for all the configurations. Would it be possible?
While we figure out how to address this, should we raise a separate PR to add the static configurations so we can make progress on the other PRs? What do you think, @squah-confluent?
|
I've split out the broker-level configs into #21730. |
1e4feb1 to
9950137
Compare
c72edee to
6dbc59e
Compare
Add group-level {consumer,share,streams}.assignment.interval.ms config
options to control the delay between assignment calculation. These
config options override the dynamic broker-level configs.
Add group-level {consumer,share,streams}.assignor.offload.enable config
options to control whether assignment calculation is offloaded to a
group coordinator background thread. These config options override the
dynamic broker-level configs.
6dbc59e to
155d411
Compare
clolov
left a comment
There was a problem hiding this comment.
Thanks for the change, the discussion and the next steps 😊 !
dajac
left a comment
There was a problem hiding this comment.
LGTM
@squah-confluent and I discussed offline and we have decided to merge this ons as-is, even the approach is sub-optimal. We need it for 4.3 so we don't really have the time to fully refactor it now. However, we will unify the approach as a follow-up for 4.4. This work is tracked by KAFKA-20337.
…oad (#21627) Add group-level {consumer,share,streams}.assignment.interval.ms config options to control the delay between assignment calculation. These config options override the dynamic broker-level configs. Add group-level {consumer,share,streams}.assignor.offload.enable config options to control whether assignment calculation is offloaded to a group coordinator background thread. These config options override the dynamic broker-level configs. Since the broker-level configs for these group-level configs are dynamic, we have to use a different approach compared to the existing group-level configs. In the interests of getting these new configs into Apache Kafka 4.3, we defer refactoring GroupConfig until a later patch. Reviewers: majialong <majialoong@gmail.com>, Christo Lolov <lolovc@amazon.com>, David Jacot <djacot@confluent.io>
|
Merged to trunk and 4.3. |
|
Thanks for the discussion! I see that a JIRA ticket has already been created for the |
…21861) `SharePartition` held both a `GroupConfigManager` and a `ShareGroupConfigProvider` reference, but `ShareGroupConfigProvider` already wraps `GroupConfigManager`. This PR removes the redundant `GroupConfigManager` dependency so that `SharePartition` only uses `ShareGroupConfigProvider` for dynamic group configuration lookups, as suggested in [#21627.](#21627 (comment)) Reviewers: Sean Quah <squah@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
…oad (apache#21627) Add group-level {consumer,share,streams}.assignment.interval.ms config options to control the delay between assignment calculation. These config options override the dynamic broker-level configs. Add group-level {consumer,share,streams}.assignor.offload.enable config options to control whether assignment calculation is offloaded to a group coordinator background thread. These config options override the dynamic broker-level configs. Since the broker-level configs for these group-level configs are dynamic, we have to use a different approach compared to the existing group-level configs. In the interests of getting these new configs into Apache Kafka 4.3, we defer refactoring GroupConfig until a later patch. Reviewers: majialong <majialoong@gmail.com>, Christo Lolov <lolovc@amazon.com>, David Jacot <djacot@confluent.io>
…pache#21861) `SharePartition` held both a `GroupConfigManager` and a `ShareGroupConfigProvider` reference, but `ShareGroupConfigProvider` already wraps `GroupConfigManager`. This PR removes the redundant `GroupConfigManager` dependency so that `SharePartition` only uses `ShareGroupConfigProvider` for dynamic group configuration lookups, as suggested in [apache#21627.](apache#21627 (comment)) Reviewers: Sean Quah <squah@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
…oad (apache#21627) Add group-level {consumer,share,streams}.assignment.interval.ms config options to control the delay between assignment calculation. These config options override the dynamic broker-level configs. Add group-level {consumer,share,streams}.assignor.offload.enable config options to control whether assignment calculation is offloaded to a group coordinator background thread. These config options override the dynamic broker-level configs. Since the broker-level configs for these group-level configs are dynamic, we have to use a different approach compared to the existing group-level configs. In the interests of getting these new configs into Apache Kafka 4.3, we defer refactoring GroupConfig until a later patch. Reviewers: majialong <majialoong@gmail.com>, Christo Lolov <lolovc@amazon.com>, David Jacot <djacot@confluent.io>
…pache#21861) `SharePartition` held both a `GroupConfigManager` and a `ShareGroupConfigProvider` reference, but `ShareGroupConfigProvider` already wraps `GroupConfigManager`. This PR removes the redundant `GroupConfigManager` dependency so that `SharePartition` only uses `ShareGroupConfigProvider` for dynamic group configuration lookups, as suggested in [apache#21627.](apache#21627 (comment)) Reviewers: Sean Quah <squah@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
Add group-level {consumer,share,streams}.assignment.interval.ms config
options to control the delay between assignment calculation. These
config options override the dynamic broker-level configs.
Add group-level {consumer,share,streams}.assignor.offload.enable config
options to control whether assignment calculation is offloaded to a
group coordinator background thread. These config options override the
dynamic broker-level configs.
Since the broker-level configs for these group-level configs are
dynamic, we have to use a different approach compared to the existing
group-level configs. In the interests of getting these new configs into
Apache Kafka 4.3, we defer refactoring GroupConfig until a later patch.
Reviewers: majialong majialoong@gmail.com, Christo Lolov
lolovc@amazon.com, David Jacot djacot@confluent.io