Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -190,7 +191,7 @@ else if (parameterType.isAssignableFrom(GlobalKTable.class)) {
protected StreamsBuilderFactoryBean buildStreamsBuilderAndRetrieveConfig(String beanNamePostPrefix,
ApplicationContext applicationContext, String inboundName,
KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
StreamsBuilderFactoryBeanConfigurer customizer,
List<StreamsBuilderFactoryBeanConfigurer> customizers,
ConfigurableEnvironment environment, BindingProperties bindingProperties) {
ConfigurableListableBeanFactory beanFactory = this.applicationContext
.getBeanFactory();
Expand Down Expand Up @@ -347,8 +348,8 @@ else if (deserializationExceptionHandler == DeserializationExceptionHandler.skip

extendedConsumerProperties.setApplicationId((String) streamConfiguration.get(StreamsConfig.APPLICATION_ID_CONFIG));

if (customizer != null) {
customizer.configure(streamsBuilderFactoryBean);
if (!CollectionUtils.isEmpty(customizers)) {
customizers.forEach(customizer -> customizer.configure(streamsBuilderFactoryBean));
}
return applicationContext.getBean(
"&stream-builder-" + beanNamePostPrefix, StreamsBuilderFactoryBean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ public KafkaStreamsFunctionProcessor kafkaStreamsFunctionProcessor(BindingServic
return new KafkaStreamsFunctionProcessor(bindingServiceProperties, kafkaStreamsExtendedBindingProperties,
keyValueSerdeResolver, kafkaStreamsBindingInformationCatalogue, kafkaStreamsMessageConversionDelegate,
cleanupConfig.getIfUnique(), streamFunctionProperties, kafkaStreamsBinderConfigurationProperties,
customizerProvider.getIfUnique(), environment);
customizerProvider.orderedStream().toList(), environment);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
private BeanFactory beanFactory;
private final StreamFunctionProperties streamFunctionProperties;
private final KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties;
StreamsBuilderFactoryBeanConfigurer customizer;
final List<StreamsBuilderFactoryBeanConfigurer> customizers;
ConfigurableEnvironment environment;

public KafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProperties,
Expand All @@ -98,7 +98,8 @@ public KafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProp
CleanupConfig cleanupConfig,
StreamFunctionProperties streamFunctionProperties,
KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
StreamsBuilderFactoryBeanConfigurer customizer, ConfigurableEnvironment environment) {
List<StreamsBuilderFactoryBeanConfigurer> customizers,
ConfigurableEnvironment environment) {
super(bindingServiceProperties, kafkaStreamsBindingInformationCatalogue, kafkaStreamsExtendedBindingProperties,
keyValueSerdeResolver, cleanupConfig);
this.bindingServiceProperties = bindingServiceProperties;
Expand All @@ -108,7 +109,7 @@ public KafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProp
this.kafkaStreamsMessageConversionDelegate = kafkaStreamsMessageConversionDelegate;
this.streamFunctionProperties = streamFunctionProperties;
this.kafkaStreamsBinderConfigurationProperties = kafkaStreamsBinderConfigurationProperties;
this.customizer = customizer;
this.customizers = customizers != null ? customizers : List.of();
this.environment = environment;
}

Expand Down Expand Up @@ -523,7 +524,7 @@ private Object[] adaptAndRetrieveInboundArguments(Map<String, ResolvableType> st
//Otherwise, create the StreamsBuilderFactory and get the underlying config.
if (!this.methodStreamsBuilderFactoryBeanMap.containsKey(functionName)) {
StreamsBuilderFactoryBean streamsBuilderFactoryBean = buildStreamsBuilderAndRetrieveConfig(functionName, applicationContext,
input, kafkaStreamsBinderConfigurationProperties, customizer, this.environment, bindingProperties);
input, kafkaStreamsBinderConfigurationProperties, customizers, this.environment, bindingProperties);
this.methodStreamsBuilderFactoryBeanMap.put(functionName, streamsBuilderFactoryBean);
}
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2026-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.stream.binder.kafka.streams;

import java.util.stream.Stream;

import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.function.StreamFunctionProperties;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer;
import org.springframework.kafka.core.CleanupConfig;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* Tests for {@link KafkaStreamsBinderSupportAutoConfiguration}.
*
* @author wadhwaroh-lang
*/
class KafkaStreamsBinderSupportAutoConfigurationTests {

@Test
@SuppressWarnings("unchecked")
void kafkaStreamsFunctionProcessorUsesAllStreamsBuilderFactoryBeanConfigurers() {
StreamsBuilderFactoryBeanConfigurer first = mock(StreamsBuilderFactoryBeanConfigurer.class);
StreamsBuilderFactoryBeanConfigurer second = mock(StreamsBuilderFactoryBeanConfigurer.class);
ObjectProvider<StreamsBuilderFactoryBeanConfigurer> customizerProvider = mock(ObjectProvider.class);
ObjectProvider<CleanupConfig> cleanupConfigProvider = mock(ObjectProvider.class);
when(customizerProvider.orderedStream()).thenReturn(Stream.of(first, second));

KafkaStreamsFunctionProcessor processor = new KafkaStreamsBinderSupportAutoConfiguration()
.kafkaStreamsFunctionProcessor(
mock(BindingServiceProperties.class),
mock(KafkaStreamsExtendedBindingProperties.class),
mock(KeyValueSerdeResolver.class),
mock(KafkaStreamsBindingInformationCatalogue.class),
mock(KafkaStreamsMessageConversionDelegate.class),
cleanupConfigProvider,
mock(StreamFunctionProperties.class),
mock(KafkaStreamsBinderConfigurationProperties.class),
customizerProvider,
mock(ConfigurableEnvironment.class)
);

assertThat(processor.customizers).containsExactly(first, second);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer()

`KafkaStreamsCustomizer` will be called by the `StreamsBuilderFactoryBeabn` right before the underlying `KafkaStreams` gets started.

There can only be one `StreamsBuilderFactoryBeanConfigurer` in the entire application.
Then how do we account for multiple Kafka Streams processors as each of them are backed up by individual `StreamsBuilderFactoryBean` objects?
In that case, if the customization needs to be different for those processors, then the application needs to apply some filter based on the application ID.
The binder invokes all `StreamsBuilderFactoryBeanConfigurer` beans in order before the factory bean is started.
If the customization needs to be different for multiple Kafka Streams processors, each backed by its own `StreamsBuilderFactoryBean`, then the application needs to apply some filter based on the application ID.

For e.g,

Expand Down
Loading