Skip to content

KAFKA-20251: Add group-level configs for assignment batching and offload#21627

Merged
dajac merged 2 commits intoapache:trunkfrom
confluentinc:squah-kip-1263-add-configs
Mar 19, 2026
Merged

KAFKA-20251: Add group-level configs for assignment batching and offload#21627
dajac merged 2 commits intoapache:trunkfrom
confluentinc:squah-kip-1263-add-configs

Conversation

@squah-confluent
Copy link
Copy Markdown
Contributor

@squah-confluent squah-confluent commented Mar 4, 2026

Add group-level {consumer,share,streams}.assignment.interval.ms config
options to control the delay between assignment calculation. These
config options override the dynamic broker-level configs.

Add group-level {consumer,share,streams}.assignor.offload.enable config
options to control whether assignment calculation is offloaded to a
group coordinator background thread. These config options override the
dynamic broker-level configs.

Since the broker-level configs for these group-level configs are
dynamic, we have to use a different approach compared to the existing
group-level configs. In the interests of getting these new configs into
Apache Kafka 4.3, we defer refactoring GroupConfig until a later patch.

Reviewers: majialong majialoong@gmail.com, Christo Lolov
lolovc@amazon.com, David Jacot djacot@confluent.io

@github-actions github-actions Bot added triage PRs from the community core Kafka Broker group-coordinator labels Mar 4, 2026
Comment on lines +116 to +118
public final int shareAssignmentIntervalMs;

public final Optional<Boolean> shareAssignorOffloadEnable;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These group-level configs were tricky. We need a special value to denote "use the group coordinator config". For ints, that is -1. For booleans, we have to use null or make the configs strings.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking a bit more about this! In other configurations (and correct me if there is a precedent for this), we tend to treat a presence at a more-granular level as an override of the coarser level. As such, if I don't want an override at a group-level in this case, I would omit setting the configuration rather than set it as -1. What do you think?

Copy link
Copy Markdown
Contributor Author

@squah-confluent squah-confluent Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would much prefer to default to null and not allow -1. However there's no precedent for having a default of null for integer or boolean configs anywhere.

If I set the default for the assignment.interval.ms configs to null, I get a startup error.

        Caused by:                                                                                                                                                                                                                                                                                                         
        java.lang.ExceptionInInitializerError: Exception org.apache.kafka.common.config.ConfigException: Invalid value null for configuration consumer.assignment.interval.ms: Value must be non-null [in thread "Test worker"]                                                                                            
            at org.apache.kafka.common.config.ConfigDef$Range.ensureValid(ConfigDef.java:1008)                                                                                                                                                                                                                             
            at org.apache.kafka.common.config.ConfigDef$ConfigKey.<init>(ConfigDef.java:1342)                                                                                                                                                                                                                              
            at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:158)                                                                                                                                                                                                                                         
            at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:201)                                                                                                                                                                                                                                         
            at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:240)                                                                                                                                                                                                                                         
            at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:402)                                                                                                                                                                                                                                         
            at org.apache.kafka.coordinator.group.GroupConfig.<clinit>(GroupConfig.java:144)        

If I instead set the default to ConfigDef.NO_DEFAULT_VALUE, I get a different error.

org.apache.kafka.common.config.ConfigException: Missing required configuration "consumer.assignment.interval.ms" which has no default value.                                                     

So to make null work for the assignment.interval.ms configs, we would have to define an Optional.of validator or Null and Any.of validators . Do you think it's worth doing?

Copy link
Copy Markdown
Contributor

@clolov clolov Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, let me try to explain what I meant in a different way. You have 3 pairs of new configurations which can be set on both the group coordinator level and on the group level.

Here is a similar example for another configuration group.consumer.session.timeout.ms (group coordinator level https://kafka.apache.org/42/configuration/broker-configs/#brokerconfigs_group.consumer.session.timeout.ms) and consumer.session.timeout.ms (group level https://kafka.apache.org/42/configuration/group-configs/#groupconfigs_consumer.session.timeout.ms).

Instead of deciding to use -1 at the group level to fall through to the group coordinator level, the definition at the group level is:

...
.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
    INT,
    GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, <-- We reference the default to be the group coordinator level default
    atLeast(1),
    MEDIUM,
    GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)
...

Do you reckon we can do something similar here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see! I got something to work.

The group-level defaults must never be used for the new configs. If we use the default value (or the values returned by GroupCoordinatorConfig.extractGroupConfigMap), the defaults will be baked in once any unrelated group-level config is set. Then we won't pick up assignment.interval.ms changes at the group coordinator-level. It's fine for the other configs since their group coordinator-level configs aren't dynamic.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the delay, I took a few days off in an area with patchy internet 😊

I got the problem after staring at it for a bit more. Basically, the configurations I gave as an example fall through to the group coordinator value set when the broker started. You would like to have the "proper" expected behaviour where a dynamic group coordinator value is picked if there is no group value present. This makes perfect sense!

@dajac dajac removed the triage PRs from the community label Mar 4, 2026
@dajac dajac self-requested a review March 4, 2026 13:02
@airlock-confluentinc airlock-confluentinc Bot force-pushed the squah-kip-1263-add-configs branch from 96bce12 to 7543f43 Compare March 4, 2026 19:15
Copy link
Copy Markdown
Contributor

@clolov clolov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made an initial pass 😊

.define(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
INT,
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT,
atLeast(-1),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can change it to atLeast(0) after using the coordinator default?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to update those, thank you!

@dajac dajac requested a review from clolov March 10, 2026 10:28
Copy link
Copy Markdown
Contributor

@dongnuo123 dongnuo123 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! LGTM

Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squah-confluent Thanks for the patch. I left a few comments/questions.

.define(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING, CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT, ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)), MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC)
// Interval config used for testing purposes.
.defineInternal(CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_DEFAULT, atLeast(10 * 1000), MEDIUM, CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_DOC)
.define(CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could you please put them before the internal ones?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reordered them

.define(SHARE_GROUP_MAX_SIZE_CONFIG, INT, SHARE_GROUP_MAX_SIZE_DEFAULT, between(1, 1000), MEDIUM, SHARE_GROUP_MAX_SIZE_DOC)
.define(SHARE_GROUP_ASSIGNORS_CONFIG, LIST, SHARE_GROUP_ASSIGNORS_DEFAULT, ConfigDef.ValidList.anyNonDuplicateValues(false, false), MEDIUM, SHARE_GROUP_ASSIGNORS_DOC)
.defineInternal(SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DEFAULT, atLeast(1), LOW, SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DOC)
.define(SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

require(consumerGroupHeartbeatIntervalMs < consumerGroupSessionTimeoutMs,
String.format("%s must be less than %s", CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG));

require(consumerGroupMaxAssignmentIntervalMs >= consumerGroupMinAssignmentIntervalMs,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are those min/max enforced when the configuration is update dynamically?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, kafka-configs.sh returns an error when the new dynamic config is out of range.

 % ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type brokers --entity-default --add-config group.consumer.assignment.interval.ms=31000

Error while executing config command with args '--bootstrap-server localhost:9092 --alter --entity-type brokers --entity-default --add-config group.consumer.assignment.interval.ms=31000'
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidRequestException: group.consumer.assignment.interval.ms must be less than or equal to group.consumer.max.assignment.interval.ms
[2026-03-12 00:26:09,877] ERROR [ConfigAdminManager[nodeId=1]: validation of configProps {group.consumer.assignment.interval.ms=31000, min.insync.replicas=1} for ConfigResource(type=BROKER, name='') failed with exception (kafka.server.ConfigAdminManager)
org.apache.kafka.common.errors.InvalidRequestException: group.consumer.assignment.interval.ms must be less than or equal to group.consumer.max.assignment.interval.ms
[2026-03-12 00:26:09,878] ERROR [ConfigAdminManager[nodeId=1]: Error preprocessing incrementalAlterConfigs request on ConfigResource(type=BROKER, name='') (kafka.server.ConfigAdminManager)
org.apache.kafka.common.errors.InvalidRequestException: group.consumer.assignment.interval.ms must be less than or equal to group.consumer.max.assignment.interval.ms

On broker startup with an out-of-range dynamic override, we get an error.

Stack
[2026-03-12 00:27:43,088] ERROR Cluster default configs could not be applied: [group.consumer.assignment.interval.ms, min.insync.replicas] (kafka.server.DynamicBrokerConfig)                                                                                                                                              
java.lang.IllegalArgumentException: group.consumer.assignment.interval.ms must be less than or equal to group.consumer.max.assignment.interval.ms            
        at org.apache.kafka.common.utils.Utils.require(Utils.java:1710)
        at org.apache.kafka.coordinator.group.GroupCoordinatorConfig.<init>(GroupCoordinatorConfig.java:602)                                                 
        at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:190)
        at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:147)   
        at kafka.server.DynamicBrokerConfig.processReconfiguration(DynamicBrokerConfig.scala:446)                                                            
        at kafka.server.DynamicBrokerConfig.updateCurrentConfig(DynamicBrokerConfig.scala:435)                                                               
        at kafka.server.DynamicBrokerConfig.$anonfun$updateDefaultConfig$1(DynamicBrokerConfig.scala:292)                                                    
        at org.apache.kafka.server.util.LockUtils.inLock(LockUtils.java:96)
        at org.apache.kafka.server.util.LockUtils.inWriteLock(LockUtils.java:115)                                                                            
        at kafka.server.DynamicBrokerConfig.updateDefaultConfig(DynamicBrokerConfig.scala:287)                                                               
        at kafka.server.BrokerConfigHandler.processConfigChanges(ConfigHandler.scala:150)                                                                    
        at kafka.server.metadata.DynamicConfigPublisher.$anonfun$onMetadataUpdate$5(DynamicConfigPublisher.scala:81)                                         
        at scala.Option.foreach(Option.scala:437)           
        at kafka.server.metadata.DynamicConfigPublisher.$anonfun$onMetadataUpdate$2(DynamicConfigPublisher.scala:74)                                         
        at java.base/java.util.HashMap$KeySet.forEach(HashMap.java:1016)
        at kafka.server.metadata.DynamicConfigPublisher.$anonfun$onMetadataUpdate$1(DynamicConfigPublisher.scala:57)                                         
        at kafka.server.metadata.DynamicConfigPublisher.$anonfun$onMetadataUpdate$1$adapted(DynamicConfigPublisher.scala:56)                                 
        at scala.Option.foreach(Option.scala:437)          
        at kafka.server.metadata.DynamicConfigPublisher.onMetadataUpdate(DynamicConfigPublisher.scala:56)                                                    
        at kafka.server.metadata.BrokerMetadataPublisher.onMetadataUpdate(BrokerMetadataPublisher.scala:190)                                                 
        at org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:315)                                                     
        at org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:272)                                    
        at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:134)                                                                 
        at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:217)                                                        
        at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:188)                                                                 
        at java.base/java.lang.Thread.run(Thread.java:1583)

This prevents other dynamic configs from being applied, so I added clamping similar to #21633.

Comment on lines +268 to +270
this.consumerAssignmentIntervalMs = props.containsKey(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) ?
Optional.of(getInt(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG)) :
Optional.empty();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit confused by this as we don't have it for consumerHeartbeatIntervalMs. Does it mean that the logic that we have for consumerHeartbeatIntervalMs is incorrect?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that this is why you had to handle them differently, correct?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another issue seems to be that we capture the default from the broker when the group manager is created so changing the broker configs dynamically has no effect. Is it correct?

Copy link
Copy Markdown
Contributor Author

@squah-confluent squah-confluent Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic for consumerHeartbeatIntervalMs works correctly since it's not dynamic at the broker level. Since the new assignment.interval.ms configs are dynamic at the broker level, we must not capture any defaults or inherit values from the broker level here.

Another issue seems to be that we capture the default from the broker when the group manager is created so changing the broker configs dynamically has no effect. Is it correct?

Yes, this applies to the heartbeat interval configs. But since those configs aren't dynamic at the broker level, the code works correctly.

public Map<String, Integer> extractConsumerGroupConfigMap() {
return Map.of(
GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, consumerGroupSessionTimeoutMs(),
GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, consumerGroupHeartbeatIntervalMs());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! It seems like the newly added parameters are missing from the corresponding extract*ConfigMap methods (extractConsumerGroupConfigMap, extractShareGroupConfigMap, extractStreamsGroupConfigMap).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's intentional. The new group coordinator level configs are dynamic and we must not bake them in.
The new testDynamicBrokerAndGroupConfigs will fail if we do.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. While looking into this, I noticed a related issue in the validate path.

When a group-level config (e.g., share.assignment.interval.ms) is not explicitly set, the validate() method uses the hardcoded default value from ConfigDef (e.g., 1000) rather than the broker's current value.

Steps to reproduce:

  1. Configure server.properties:
group.share.assignment.interval.ms=2000
group.share.min.assignment.interval.ms=2000
  1. Try to set a different group config (not share.assignment.interval.ms):
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type groups --entity-name group-1 --add-config share.heartbeat.interval.ms=5058
  1. Error occurs:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidConfigurationException: share.assignment.interval.ms must be greater than or equal to group.share.min.assignment.interval.ms

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One possible approach could be adding a new method extractDynamicGroupConfigMap() in GroupCoordinatorConfig.

The existing extractGroupConfigMap() only includes static configs (e.g., session.timeout.ms), which is correct for GroupConfigManager defaults. However, validate() also needs dynamic configs (those in RECONFIGURABLE_CONFIGS, e.g., assignment.interval.ms) to validate against the broker's current values.

The new method could return current broker values for dynamic group configs, and validate() would merge both:

combinedConfigs.putAll(groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig));
combinedConfigs.putAll(groupCoordinatorConfig.extractDynamicGroupConfigMap());
combinedConfigs.putAll(props);

This way, validation would use the broker's current dynamic values while keeping GroupConfigManager behavior unchanged.

What do you think? Happy to discuss other approaches as well.

Copy link
Copy Markdown
Contributor

@majialoong majialoong Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking further, there might be a more fundamental improvement worth discussing.

The root cause seems to be that GroupConfigManager stores a snapshot of defaults at construction time. These values are then captured in GroupConfig and won't reflect later broker config changes. This causes stale values and requires maintaining static/dynamic config distinctions.

Perhaps we could refactor GroupConfig to only store user-explicit values (all fields as Optional<T>), and have GroupConfigManager provide accessor methods with runtime fallback:

// In GroupConfigManager
public int shareGroupAssignmentIntervalMs(String groupId) {
    return groupConfig(groupId)
        .flatMap(GroupConfig::shareAssignmentIntervalMs)
        .orElse(groupCoordinatorConfig.shareGroupAssignmentIntervalMs());
}

This way, callers wouldn't need to be aware of the fallback logic - they simply call one method and get the effective value, whether from group-level config or broker-level config (static or dynamic).

If this approach sounds reasonable, I'd be happy to work on the implementation.

WDYT?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The broker-level min and max assignment batching configs are static, so the valid range cannot change after the group configs are baked. Clamping is still applied to group configs at broker startup, to the current min and max range.

The inherited, dynamic broker-level assignment batching config cannot go out of range, since we reject out-of-range alter configs. The dynamic broker-level assignment batching config is also subject to clamping on broker startup, so groups cannot inherit out-of-range values.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we reject out-of-range alter configs.

If users alter dynamic broker configs via the controller, how does the controller reject out-of-range values during the RPC if it doesn't have access to the broker's static min/max bounds?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The min/max configs must be deployed over there too.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just another perspective on GroupConfig: If we leave all members as Optional, the GroupConfig will not be a self-contained object, which compels us to rely on ShareGroupConfigProvider or other Manager classes to resolve the correct configurations.

I was also thinking about this. I think that we could consider handling the fallback within GroupConfig too. It would basically need a reference to the GroupCoordinatorConfig to achieve it. I don't have a strong preference for either ways to be honest.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The min/max configs must be deployed over there too.

I might be missing something here. Do you mean operators must manually keep the static configurations (like min/max ranges) identical across both brokers and controllers to ensure validation works?

Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The end state is not really satisfying here. We end up with two ways of handling dynamic configurations. This is really confusing. I understand that the current approach is also confusing and it does not support dynamic broker configuration overrides because the default values are captured when the GroupConfig is created.

One possible way forward would be to adopt the pattern proposed by @squah-confluent for all the configurations. Would it be possible?

While we figure out how to address this, should we raise a separate PR to add the static configurations so we can make progress on the other PRs? What do you think, @squah-confluent?

@squah-confluent
Copy link
Copy Markdown
Contributor Author

squah-confluent commented Mar 12, 2026

I've split out the broker-level configs into #21730.

@airlock-confluentinc airlock-confluentinc Bot force-pushed the squah-kip-1263-add-configs branch from 1e4feb1 to 9950137 Compare March 13, 2026 01:33
@squah-confluent squah-confluent changed the title KAFKA-20251: Add configs for assignment batching and offload KAFKA-20251: Add group-level configs for assignment batching and offload Mar 13, 2026
@airlock-confluentinc airlock-confluentinc Bot force-pushed the squah-kip-1263-add-configs branch from c72edee to 6dbc59e Compare March 13, 2026 10:24
Add group-level {consumer,share,streams}.assignment.interval.ms config
options to control the delay between assignment calculation. These
config options override the dynamic broker-level configs.

Add group-level {consumer,share,streams}.assignor.offload.enable config
options to control whether assignment calculation is offloaded to a
group coordinator background thread. These config options override the
dynamic broker-level configs.
@airlock-confluentinc airlock-confluentinc Bot force-pushed the squah-kip-1263-add-configs branch from 6dbc59e to 155d411 Compare March 19, 2026 11:37
Copy link
Copy Markdown
Contributor

@clolov clolov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the change, the discussion and the next steps 😊 !

Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@squah-confluent and I discussed offline and we have decided to merge this ons as-is, even the approach is sub-optimal. We need it for 4.3 so we don't really have the time to fully refactor it now. However, we will unify the approach as a follow-up for 4.4. This work is tracked by KAFKA-20337.

@dajac dajac merged commit ba0738d into apache:trunk Mar 19, 2026
25 checks passed
@dajac dajac deleted the squah-kip-1263-add-configs branch March 19, 2026 14:44
dajac pushed a commit that referenced this pull request Mar 19, 2026
…oad (#21627)

Add group-level {consumer,share,streams}.assignment.interval.ms config
options to control the delay between assignment calculation. These
config options override the dynamic broker-level configs.

Add group-level {consumer,share,streams}.assignor.offload.enable config
options to control whether assignment calculation is offloaded to a
group coordinator background thread. These config options override the
dynamic broker-level configs.

Since the broker-level configs for these group-level configs are
dynamic, we have to use a different approach compared to the existing
group-level configs. In the interests of getting these new configs into
Apache Kafka 4.3, we defer refactoring GroupConfig until a later patch.

Reviewers: majialong <majialoong@gmail.com>, Christo Lolov
<lolovc@amazon.com>, David Jacot <djacot@confluent.io>
@dajac
Copy link
Copy Markdown
Member

dajac commented Mar 19, 2026

Merged to trunk and 4.3.

@majialoong
Copy link
Copy Markdown
Contributor

Thanks for the discussion! I see that a JIRA ticket has already been created for the GroupConfig refactor, and I’ve created KAFKA-20340 to follow up on the second point.

chia7712 pushed a commit that referenced this pull request Mar 25, 2026
…21861)

`SharePartition` held both a `GroupConfigManager` and a
`ShareGroupConfigProvider` reference, but `ShareGroupConfigProvider`
already wraps `GroupConfigManager`.

This PR removes the redundant `GroupConfigManager` dependency so that
`SharePartition` only uses `ShareGroupConfigProvider` for dynamic group
configuration lookups, as suggested in

[#21627.](#21627 (comment))

Reviewers: Sean Quah <squah@confluent.io>, Chia-Ping Tsai
 <chia7712@gmail.com>
Shekharrajak pushed a commit to Shekharrajak/kafka that referenced this pull request Mar 31, 2026
…oad (apache#21627)

Add group-level {consumer,share,streams}.assignment.interval.ms config
options to control the delay between assignment calculation. These
config options override the dynamic broker-level configs.

Add group-level {consumer,share,streams}.assignor.offload.enable config
options to control whether assignment calculation is offloaded to a
group coordinator background thread. These config options override the
dynamic broker-level configs.

Since the broker-level configs for these group-level configs are
dynamic, we have to use a different approach compared to the existing
group-level configs. In the interests of getting these new configs into
Apache Kafka 4.3, we defer refactoring GroupConfig until a later patch.

Reviewers: majialong <majialoong@gmail.com>, Christo Lolov
<lolovc@amazon.com>, David Jacot <djacot@confluent.io>
Shekharrajak pushed a commit to Shekharrajak/kafka that referenced this pull request Mar 31, 2026
…pache#21861)

`SharePartition` held both a `GroupConfigManager` and a
`ShareGroupConfigProvider` reference, but `ShareGroupConfigProvider`
already wraps `GroupConfigManager`.

This PR removes the redundant `GroupConfigManager` dependency so that
`SharePartition` only uses `ShareGroupConfigProvider` for dynamic group
configuration lookups, as suggested in

[apache#21627.](apache#21627 (comment))

Reviewers: Sean Quah <squah@confluent.io>, Chia-Ping Tsai
 <chia7712@gmail.com>
nileshkumar3 pushed a commit to nileshkumar3/kafka that referenced this pull request Apr 15, 2026
…oad (apache#21627)

Add group-level {consumer,share,streams}.assignment.interval.ms config
options to control the delay between assignment calculation. These
config options override the dynamic broker-level configs.

Add group-level {consumer,share,streams}.assignor.offload.enable config
options to control whether assignment calculation is offloaded to a
group coordinator background thread. These config options override the
dynamic broker-level configs.

Since the broker-level configs for these group-level configs are
dynamic, we have to use a different approach compared to the existing
group-level configs. In the interests of getting these new configs into
Apache Kafka 4.3, we defer refactoring GroupConfig until a later patch.

Reviewers: majialong <majialoong@gmail.com>, Christo Lolov
<lolovc@amazon.com>, David Jacot <djacot@confluent.io>
nileshkumar3 pushed a commit to nileshkumar3/kafka that referenced this pull request Apr 15, 2026
…pache#21861)

`SharePartition` held both a `GroupConfigManager` and a
`ShareGroupConfigProvider` reference, but `ShareGroupConfigProvider`
already wraps `GroupConfigManager`.

This PR removes the redundant `GroupConfigManager` dependency so that
`SharePartition` only uses `ShareGroupConfigProvider` for dynamic group
configuration lookups, as suggested in

[apache#21627.](apache#21627 (comment))

Reviewers: Sean Quah <squah@confluent.io>, Chia-Ping Tsai
 <chia7712@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants