Skip to content
This repository has been archived by the owner on Nov 20, 2024. It is now read-only.

Commit

Permalink
AdminClient customizer (#1023)
Browse files Browse the repository at this point in the history
* AdminClient customizer

Provide the ability for applications to customize AdminClient by
introducing a new interface AdminClientConfigCustomizer.

Resolves #1014

* Addressing PR review comments
  • Loading branch information
sobychacko authored Feb 3, 2021
1 parent cadb542 commit 32939e3
Show file tree
Hide file tree
Showing 12 changed files with 90 additions and 23 deletions.
17 changes: 17 additions & 0 deletions docs/src/main/asciidoc/overview.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -786,3 +786,20 @@ For example, if you want to gain access to a bean that is defined at the applica
When the binder discovers that these customizers are available as beans, it will invoke the `configure` method right before creating the consumer and producer factories.

Both of these interfaces also provide access to both the binding and destination names so that they can be accessed while customizing producer and consumer properties.

[[admin-client-config-customization]]
=== Customizing AdminClient Configuration

As with consumer and producer config customization above, applications can also customize the configuration for admin clients by providing an `AdminClientConfigCustomizer`.
AdminClientConfigCustomizer's configure method provides access to the admin client properties, using which you can define further customization.
Binder's Kafka topic provisioner gives the highest precedence for the properties given through this customizer.
Here is an example of providing this customizer bean.

```
@Bean
public AdminClientConfigCustomizer adminClientConfigCustomizer() {
return props -> {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
};
}
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2021-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.stream.binder.kafka.provisioning;

import java.util.Map;

/**
* Customizer for configuring AdminClient.
*
* @author Soby Chacko
* @since 3.1.2
*/
@FunctionalInterface
public interface AdminClientConfigCustomizer {

void configure(Map<String, Object> adminClientProperties);
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,23 @@ public class KafkaTopicProvisioner implements
* Create an instance.
* @param kafkaBinderConfigurationProperties the binder configuration properties.
* @param kafkaProperties the boot Kafka properties used to build the
* @param adminClientConfigCustomizer to customize {@link AdminClient}.
* {@link AdminClient}.
*/
public KafkaTopicProvisioner(
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
KafkaProperties kafkaProperties) {
KafkaProperties kafkaProperties,
AdminClientConfigCustomizer adminClientConfigCustomizer) {
Assert.isTrue(kafkaProperties != null, "KafkaProperties cannot be null");
this.adminClientProperties = kafkaProperties.buildAdminProperties();
this.configurationProperties = kafkaBinderConfigurationProperties;
this.adminClientProperties = kafkaProperties.buildAdminProperties();
normalalizeBootPropsWithBinder(this.adminClientProperties, kafkaProperties,
kafkaBinderConfigurationProperties);
// If the application provides an AdminConfig customizer
// and overrides properties, that takes precedence.
if (adminClientConfigCustomizer != null) {
adminClientConfigCustomizer.configure(this.adminClientProperties);
}
}

/**
Expand Down Expand Up @@ -151,7 +158,7 @@ public ProducerDestination provisionProducerDestination(final String name,
logger.info("Using kafka topic for outbound: " + name);
}
KafkaTopicUtils.validateTopicName(name);
try (AdminClient adminClient = AdminClient.create(this.adminClientProperties)) {
try (AdminClient adminClient = createAdminClient()) {
createTopic(adminClient, name, properties.getPartitionCount(), false,
properties.getExtension().getTopic());
int partitions = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
*/
public class KafkaTopicProvisionerTests {

AdminClientConfigCustomizer adminClientConfigCustomizer = adminClientProperties -> adminClientProperties.put("foo", "bar");

@SuppressWarnings("rawtypes")
@Test
public void bootPropertiesOverriddenExceptServers() throws Exception {
Expand All @@ -58,7 +60,7 @@ public void bootPropertiesOverriddenExceptServers() throws Exception {
ts.getFile().getAbsolutePath());
binderConfig.setBrokers("localhost:9092");
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderConfig,
bootConfig);
bootConfig, adminClientConfigCustomizer);
AdminClient adminClient = provisioner.createAdminClient();
assertThat(KafkaTestUtils.getPropertyValue(adminClient,
"client.selector.channelBuilder")).isInstanceOf(SslChannelBuilder.class);
Expand All @@ -67,6 +69,7 @@ public void bootPropertiesOverriddenExceptServers() throws Exception {
assertThat(
((List) configs.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)).get(0))
.isEqualTo("localhost:1234");
assertThat(configs.get("foo")).isEqualTo("bar");
adminClient.close();
}

Expand All @@ -86,7 +89,7 @@ public void bootPropertiesOverriddenIncludingServers() throws Exception {
ts.getFile().getAbsolutePath());
binderConfig.setBrokers("localhost:1234");
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderConfig,
bootConfig);
bootConfig, adminClientConfigCustomizer);
AdminClient adminClient = provisioner.createAdminClient();
assertThat(KafkaTestUtils.getPropertyValue(adminClient,
"client.selector.channelBuilder")).isInstanceOf(SslChannelBuilder.class);
Expand All @@ -106,7 +109,7 @@ public void brokersInvalid() throws Exception {
binderConfig.getConfiguration().put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
"localhost:1234");
try {
new KafkaTopicProvisioner(binderConfig, bootConfig);
new KafkaTopicProvisioner(binderConfig, bootConfig, adminClientConfigCustomizer);
fail("Expected illegal state");
}
catch (IllegalStateException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

import java.util.Map;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
Expand All @@ -46,8 +48,8 @@ public class GlobalKTableBinderConfiguration {
@Bean
public KafkaTopicProvisioner provisioningProvider(
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaProperties kafkaProperties) {
return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties);
KafkaProperties kafkaProperties, ObjectProvider<AdminClientConfigCustomizer> adminClientConfigCustomizer) {
return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties, adminClientConfigCustomizer.getIfUnique());
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

package org.springframework.cloud.stream.binder.kafka.streams;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
Expand All @@ -44,9 +46,9 @@ public class KStreamBinderConfiguration {
@Bean
public KafkaTopicProvisioner provisioningProvider(
KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
KafkaProperties kafkaProperties) {
KafkaProperties kafkaProperties, ObjectProvider<AdminClientConfigCustomizer> adminClientConfigCustomizer) {
return new KafkaTopicProvisioner(kafkaStreamsBinderConfigurationProperties,
kafkaProperties);
kafkaProperties, adminClientConfigCustomizer.getIfUnique());
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

import java.util.Map;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
Expand All @@ -46,8 +48,8 @@ public class KTableBinderConfiguration {
@Bean
public KafkaTopicProvisioner provisioningProvider(
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaProperties kafkaProperties) {
return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties);
KafkaProperties kafkaProperties, ObjectProvider<AdminClientConfigCustomizer> adminClientConfigCustomizer) {
return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties, adminClientConfigCustomizer.getIfUnique());
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.springframework.cloud.stream.binder.kafka.properties.JaasLoginModuleConfiguration;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.utils.DlqDestinationResolver;
import org.springframework.cloud.stream.binder.kafka.utils.DlqPartitionFunction;
Expand Down Expand Up @@ -104,8 +105,10 @@ KafkaBinderConfigurationProperties configurationProperties(

@Bean
KafkaTopicProvisioner provisioningProvider(
KafkaBinderConfigurationProperties configurationProperties) {
return new KafkaTopicProvisioner(configurationProperties, this.kafkaProperties);
KafkaBinderConfigurationProperties configurationProperties,
ObjectProvider<AdminClientConfigCustomizer> adminClientConfigCustomizer) {
return new KafkaTopicProvisioner(configurationProperties,
this.kafkaProperties, adminClientConfigCustomizer.getIfUnique());
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void testAutoCreateTopicDisabledFailsOnConsumerIfTopicNonExistentOnBroker
configurationProperties.setAutoCreateTopics(false);

KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
configurationProperties, kafkaProperties);
configurationProperties, kafkaProperties, null);
provisioningProvider.setMetadataRetryOperations(new RetryTemplate());

KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(
Expand Down Expand Up @@ -97,7 +97,7 @@ public void testAutoCreateTopicDisabledFailsOnProducerIfTopicNonExistentOnBroker
configurationProperties.getConfiguration().put("max.block.ms", "3000");

KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
configurationProperties, kafkaProperties);
configurationProperties, kafkaProperties, null);
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(1);
final RetryTemplate metadataRetryOperations = new RetryTemplate();
metadataRetryOperations.setRetryPolicy(simpleRetryPolicy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ protected KafkaTestBinder getBinder() {
if (binder == null) {
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner(
binderConfiguration, new TestKafkaProperties());
binderConfiguration, new TestKafkaProperties(), null);
try {
kafkaTopicProvisioner.afterPropertiesSet();
}
Expand All @@ -232,7 +232,7 @@ private KafkaTestBinder getBinder(
DlqPartitionFunction dlqPartitionFunction, DlqDestinationResolver dlqDestinationResolver) {

KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
kafkaBinderConfigurationProperties, new TestKafkaProperties());
kafkaBinderConfigurationProperties, new TestKafkaProperties(), null);
try {
provisioningProvider.afterPropertiesSet();
}
Expand Down Expand Up @@ -401,7 +401,7 @@ public void testCustomHeaderMapper() throws Exception {
binderConfiguration.setHeaderMapperBeanName("headerMapper");

KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner(
binderConfiguration, new TestKafkaProperties());
binderConfiguration, new TestKafkaProperties(), null);
try {
kafkaTopicProvisioner.afterPropertiesSet();
}
Expand Down Expand Up @@ -478,7 +478,7 @@ public void testWellKnownHeaderMapperWithBeanNameKafkaHeaderMapper() throws Exce
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();

KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner(
binderConfiguration, new TestKafkaProperties());
binderConfiguration, new TestKafkaProperties(), null);
try {
kafkaTopicProvisioner.afterPropertiesSet();
}
Expand Down Expand Up @@ -3656,7 +3656,7 @@ public void testInternalHeadersNotPropagatedGuts(String name, String[] headerPat
binderConfiguration.setHeaderMapperBeanName("headerMapper");

KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner(
binderConfiguration, new TestKafkaProperties());
binderConfiguration, new TestKafkaProperties(), null);
try {
kafkaTopicProvisioner.afterPropertiesSet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void testPropertyOverrides() throws Exception {
KafkaBinderConfigurationProperties binderConfigurationProperties = new KafkaBinderConfigurationProperties(
kafkaProperties);
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
binderConfigurationProperties, kafkaProperties);
binderConfigurationProperties, kafkaProperties, null);
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(
binderConfigurationProperties, provisioningProvider);
KafkaConsumerProperties consumerProps = new KafkaConsumerProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void testProducerRunsInTx() {
configurationProperties.getTransaction().setTransactionIdPrefix("foo-");
configurationProperties.getTransaction().getProducer().setUseNativeEncoding(true);
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
configurationProperties, kafkaProperties);
configurationProperties, kafkaProperties, null);
provisioningProvider.setMetadataRetryOperations(new RetryTemplate());
final Producer mockProducer = mock(Producer.class);
given(mockProducer.send(any(), any())).willReturn(new SettableListenableFuture<>());
Expand Down

0 comments on commit 32939e3

Please sign in to comment.