Skip to content
This repository has been archived by the owner on Nov 20, 2024. It is now read-only.

AdminClient customizer #1023

Merged
merged 2 commits into from
Feb 3, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions docs/src/main/asciidoc/overview.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -786,3 +786,20 @@ For example, if you want to gain access to a bean that is defined at the applica
When the binder discovers that these customizers are available as beans, it will invoke the `configure` method right before creating the consumer and producer factories.

Both of these interfaces also provide access to both the binding and destination names so that they can be accessed while customizing producer and consumer properties.

[[admin-client-config-customization]]
=== Customizing AdminClient Configuration

As with consumer and producer config customization above, applications can also customize the configuration for admin client by providing an AdminClientConfigCustomizer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
As with consumer and producer config customization above, applications can also customize the configuration for admin client by providing an AdminClientConfigCustomizer.
As with consumer and producer config customization above, applications can also customize the configuration for admin clients by providing an `AdminClientConfigCustomizer`.

Copy link
Contributor Author

@sobychacko sobychacko Feb 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good points about the bindings/binder names. Issue created.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant all the customizers (producer, consumer) not just the admin.

AdminClientConfigCustomizer's configure method provides access to the admin client properties, using which you can define further customization.
Binder's Kafka topic provisioner gives the highest precedence for the properties given through this customizer.
Here is an example of providing this customizer bean.

```
@Bean
public AdminClientConfigCustomizer adminClientConfigCustomizer() {
return props -> {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
};
}
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2021-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.stream.binder.kafka.provisioning;

import java.util.Map;

/**
* Customizer for configuring AdminClient.
*
* @author Soby Chacko
* @since 3.1.2
*/
@FunctionalInterface
public interface AdminClientConfigCustomizer {

void configure(Map<String, Object> adminClientProperties);
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,23 @@ public class KafkaTopicProvisioner implements
* Create an instance.
* @param kafkaBinderConfigurationProperties the binder configuration properties.
* @param kafkaProperties the boot Kafka properties used to build the
* @param adminClientConfigCustomizer to customize {@link AdminClient}.
* {@link AdminClient}.
*/
public KafkaTopicProvisioner(
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
KafkaProperties kafkaProperties) {
KafkaProperties kafkaProperties,
AdminClientConfigCustomizer adminClientConfigCustomizer) {
Assert.isTrue(kafkaProperties != null, "KafkaProperties cannot be null");
this.adminClientProperties = kafkaProperties.buildAdminProperties();
this.configurationProperties = kafkaBinderConfigurationProperties;
this.adminClientProperties = kafkaProperties.buildAdminProperties();
normalalizeBootPropsWithBinder(this.adminClientProperties, kafkaProperties,
kafkaBinderConfigurationProperties);
// If the application provides an AdminConfig customizer
// and overrides properties, that takes precedence.
if (adminClientConfigCustomizer != null) {
adminClientConfigCustomizer.configure(this.adminClientProperties);
}
}

/**
Expand Down Expand Up @@ -151,7 +158,7 @@ public ProducerDestination provisionProducerDestination(final String name,
logger.info("Using kafka topic for outbound: " + name);
}
KafkaTopicUtils.validateTopicName(name);
try (AdminClient adminClient = AdminClient.create(this.adminClientProperties)) {
try (AdminClient adminClient = createAdminClient()) {
createTopic(adminClient, name, properties.getPartitionCount(), false,
properties.getExtension().getTopic());
int partitions = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
*/
public class KafkaTopicProvisionerTests {

AdminClientConfigCustomizer adminClientConfigCustomizer = adminClientProperties -> adminClientProperties.put("foo", "bar");

@SuppressWarnings("rawtypes")
@Test
public void bootPropertiesOverriddenExceptServers() throws Exception {
Expand All @@ -58,7 +60,7 @@ public void bootPropertiesOverriddenExceptServers() throws Exception {
ts.getFile().getAbsolutePath());
binderConfig.setBrokers("localhost:9092");
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderConfig,
bootConfig);
bootConfig, adminClientConfigCustomizer);
AdminClient adminClient = provisioner.createAdminClient();
assertThat(KafkaTestUtils.getPropertyValue(adminClient,
"client.selector.channelBuilder")).isInstanceOf(SslChannelBuilder.class);
Expand All @@ -67,6 +69,7 @@ public void bootPropertiesOverriddenExceptServers() throws Exception {
assertThat(
((List) configs.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)).get(0))
.isEqualTo("localhost:1234");
assertThat(configs.get("foo")).isEqualTo("bar");
adminClient.close();
}

Expand All @@ -86,7 +89,7 @@ public void bootPropertiesOverriddenIncludingServers() throws Exception {
ts.getFile().getAbsolutePath());
binderConfig.setBrokers("localhost:1234");
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderConfig,
bootConfig);
bootConfig, adminClientConfigCustomizer);
AdminClient adminClient = provisioner.createAdminClient();
assertThat(KafkaTestUtils.getPropertyValue(adminClient,
"client.selector.channelBuilder")).isInstanceOf(SslChannelBuilder.class);
Expand All @@ -106,7 +109,7 @@ public void brokersInvalid() throws Exception {
binderConfig.getConfiguration().put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
"localhost:1234");
try {
new KafkaTopicProvisioner(binderConfig, bootConfig);
new KafkaTopicProvisioner(binderConfig, bootConfig, adminClientConfigCustomizer);
fail("Expected illegal state");
}
catch (IllegalStateException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

import java.util.Map;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
Expand All @@ -46,8 +48,8 @@ public class GlobalKTableBinderConfiguration {
@Bean
public KafkaTopicProvisioner provisioningProvider(
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaProperties kafkaProperties) {
return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties);
KafkaProperties kafkaProperties, ObjectProvider<AdminClientConfigCustomizer> adminClientConfigCustomizer) {
return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties, adminClientConfigCustomizer.getIfUnique());
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

package org.springframework.cloud.stream.binder.kafka.streams;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
Expand All @@ -44,9 +46,9 @@ public class KStreamBinderConfiguration {
@Bean
public KafkaTopicProvisioner provisioningProvider(
KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
KafkaProperties kafkaProperties) {
KafkaProperties kafkaProperties, ObjectProvider<AdminClientConfigCustomizer> adminClientConfigCustomizer) {
return new KafkaTopicProvisioner(kafkaStreamsBinderConfigurationProperties,
kafkaProperties);
kafkaProperties, adminClientConfigCustomizer.getIfUnique());
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

import java.util.Map;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
Expand All @@ -46,8 +48,8 @@ public class KTableBinderConfiguration {
@Bean
public KafkaTopicProvisioner provisioningProvider(
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaProperties kafkaProperties) {
return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties);
KafkaProperties kafkaProperties, ObjectProvider<AdminClientConfigCustomizer> adminClientConfigCustomizer) {
return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties, adminClientConfigCustomizer.getIfUnique());
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.springframework.cloud.stream.binder.kafka.properties.JaasLoginModuleConfiguration;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.utils.DlqDestinationResolver;
import org.springframework.cloud.stream.binder.kafka.utils.DlqPartitionFunction;
Expand Down Expand Up @@ -104,8 +105,10 @@ KafkaBinderConfigurationProperties configurationProperties(

@Bean
KafkaTopicProvisioner provisioningProvider(
KafkaBinderConfigurationProperties configurationProperties) {
return new KafkaTopicProvisioner(configurationProperties, this.kafkaProperties);
KafkaBinderConfigurationProperties configurationProperties,
ObjectProvider<AdminClientConfigCustomizer> adminClientConfigCustomizer) {
return new KafkaTopicProvisioner(configurationProperties,
this.kafkaProperties, adminClientConfigCustomizer.getIfUnique());
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
Expand All @@ -52,6 +53,8 @@ public class AutoCreateTopicDisabledTests {
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, 1)
.brokerProperty(KafkaConfig.AutoCreateTopicsEnableProp(), "false");

AdminClientConfigCustomizer adminClientConfigCustomizer = adminClientProperties -> adminClientProperties.put("foo", "bar");

@Test
public void testAutoCreateTopicDisabledFailsOnConsumerIfTopicNonExistentOnBroker()
throws Throwable {
Expand All @@ -65,7 +68,7 @@ public void testAutoCreateTopicDisabledFailsOnConsumerIfTopicNonExistentOnBroker
configurationProperties.setAutoCreateTopics(false);

KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
configurationProperties, kafkaProperties);
configurationProperties, kafkaProperties, adminClientConfigCustomizer);
provisioningProvider.setMetadataRetryOperations(new RetryTemplate());

KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(
Expand Down Expand Up @@ -97,7 +100,7 @@ public void testAutoCreateTopicDisabledFailsOnProducerIfTopicNonExistentOnBroker
configurationProperties.getConfiguration().put("max.block.ms", "3000");

KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
configurationProperties, kafkaProperties);
configurationProperties, kafkaProperties, adminClientConfigCustomizer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear why we're changing this test since we aren't verifying that the customizer was called. It's already covered by an earlier test.

Ditto the next test.

SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(1);
final RetryTemplate metadataRetryOperations = new RetryTemplate();
metadataRetryOperations.setRetryPolicy(simpleRetryPolicy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.utils.DlqDestinationResolver;
import org.springframework.cloud.stream.binder.kafka.utils.DlqPartitionFunction;
Expand Down Expand Up @@ -181,6 +182,8 @@ public class KafkaBinderTests extends

private AdminClient adminClient;

AdminClientConfigCustomizer adminClientConfigCustomizer = adminClientProperties -> adminClientProperties.put("foo", "bar");

@Override
protected ExtendedConsumerProperties<KafkaConsumerProperties> createConsumerProperties() {
final ExtendedConsumerProperties<KafkaConsumerProperties> kafkaConsumerProperties = new ExtendedConsumerProperties<>(
Expand Down Expand Up @@ -209,7 +212,7 @@ protected KafkaTestBinder getBinder() {
if (binder == null) {
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner(
binderConfiguration, new TestKafkaProperties());
binderConfiguration, new TestKafkaProperties(), adminClientConfigCustomizer);
try {
kafkaTopicProvisioner.afterPropertiesSet();
}
Expand All @@ -232,7 +235,7 @@ private KafkaTestBinder getBinder(
DlqPartitionFunction dlqPartitionFunction, DlqDestinationResolver dlqDestinationResolver) {

KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
kafkaBinderConfigurationProperties, new TestKafkaProperties());
kafkaBinderConfigurationProperties, new TestKafkaProperties(), adminClientConfigCustomizer);
try {
provisioningProvider.afterPropertiesSet();
}
Expand Down Expand Up @@ -401,7 +404,7 @@ public void testCustomHeaderMapper() throws Exception {
binderConfiguration.setHeaderMapperBeanName("headerMapper");

KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner(
binderConfiguration, new TestKafkaProperties());
binderConfiguration, new TestKafkaProperties(), adminClientConfigCustomizer);
try {
kafkaTopicProvisioner.afterPropertiesSet();
}
Expand Down Expand Up @@ -478,7 +481,7 @@ public void testWellKnownHeaderMapperWithBeanNameKafkaHeaderMapper() throws Exce
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();

KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner(
binderConfiguration, new TestKafkaProperties());
binderConfiguration, new TestKafkaProperties(), adminClientConfigCustomizer);
try {
kafkaTopicProvisioner.afterPropertiesSet();
}
Expand Down Expand Up @@ -3656,7 +3659,7 @@ public void testInternalHeadersNotPropagatedGuts(String name, String[] headerPat
binderConfiguration.setHeaderMapperBeanName("headerMapper");

KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner(
binderConfiguration, new TestKafkaProperties());
binderConfiguration, new TestKafkaProperties(), adminClientConfigCustomizer);
try {
kafkaTopicProvisioner.afterPropertiesSet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
Expand Down Expand Up @@ -72,13 +73,15 @@
*/
public class KafkaBinderUnitTests {

AdminClientConfigCustomizer adminClientConfigCustomizer = adminClientProperties -> adminClientProperties.put("foo", "bar");

@Test
public void testPropertyOverrides() throws Exception {
KafkaProperties kafkaProperties = new TestKafkaProperties();
KafkaBinderConfigurationProperties binderConfigurationProperties = new KafkaBinderConfigurationProperties(
kafkaProperties);
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
binderConfigurationProperties, kafkaProperties);
binderConfigurationProperties, kafkaProperties, adminClientConfigCustomizer);
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(
binderConfigurationProperties, provisioningProvider);
KafkaConsumerProperties consumerProps = new KafkaConsumerProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.channel.DirectChannel;
Expand Down Expand Up @@ -62,6 +63,8 @@ public class KafkaTransactionTests {
.brokerProperty("transaction.state.log.replication.factor", "1")
.brokerProperty("transaction.state.log.min.isr", "1");

AdminClientConfigCustomizer adminClientConfigCustomizer = adminClientProperties -> adminClientProperties.put("foo", "bar");

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testProducerRunsInTx() {
Expand All @@ -73,7 +76,7 @@ public void testProducerRunsInTx() {
configurationProperties.getTransaction().setTransactionIdPrefix("foo-");
configurationProperties.getTransaction().getProducer().setUseNativeEncoding(true);
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
configurationProperties, kafkaProperties);
configurationProperties, kafkaProperties, adminClientConfigCustomizer);
provisioningProvider.setMetadataRetryOperations(new RetryTemplate());
final Producer mockProducer = mock(Producer.class);
given(mockProducer.send(any(), any())).willReturn(new SettableListenableFuture<>());
Expand Down