From 32939e30fe79e9108a7bde61ed679e727dd8ebc3 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Wed, 3 Feb 2021 10:25:03 -0500 Subject: [PATCH] AdminClient customizer (#1023) * AdminClient customizer Provide the ability for applications to customize AdminClient by introducing a new interface AdminClientConfigCustomizer. Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1014 * Addressing PR review comments --- docs/src/main/asciidoc/overview.adoc | 17 ++++++++++ .../AdminClientConfigCustomizer.java | 31 +++++++++++++++++++ .../provisioning/KafkaTopicProvisioner.java | 13 ++++++-- .../KafkaTopicProvisionerTests.java | 9 ++++-- .../GlobalKTableBinderConfiguration.java | 6 ++-- .../streams/KStreamBinderConfiguration.java | 6 ++-- .../streams/KTableBinderConfiguration.java | 6 ++-- .../config/KafkaBinderConfiguration.java | 7 +++-- .../kafka/AutoCreateTopicDisabledTests.java | 4 +-- .../stream/binder/kafka/KafkaBinderTests.java | 10 +++--- .../binder/kafka/KafkaBinderUnitTests.java | 2 +- .../binder/kafka/KafkaTransactionTests.java | 2 +- 12 files changed, 90 insertions(+), 23 deletions(-) create mode 100644 spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/provisioning/AdminClientConfigCustomizer.java diff --git a/docs/src/main/asciidoc/overview.adoc b/docs/src/main/asciidoc/overview.adoc index 686bb3ae8..2e870cfc9 100644 --- a/docs/src/main/asciidoc/overview.adoc +++ b/docs/src/main/asciidoc/overview.adoc @@ -786,3 +786,20 @@ For example, if you want to gain access to a bean that is defined at the applica When the binder discovers that these customizers are available as beans, it will invoke the `configure` method right before creating the consumer and producer factories. Both of these interfaces also provide access to both the binding and destination names so that they can be accessed while customizing producer and consumer properties. + +[[admin-client-config-customization]] +=== Customizing AdminClient Configuration + +As with consumer and producer config customization above, applications can also customize the configuration for admin clients by providing an `AdminClientConfigCustomizer`. +AdminClientConfigCustomizer's configure method provides access to the admin client properties, using which you can define further customization. +Binder's Kafka topic provisioner gives the highest precedence for the properties given through this customizer. +Here is an example of providing this customizer bean. + +``` +@Bean +public AdminClientConfigCustomizer adminClientConfigCustomizer() { + return props -> { + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); + }; +} +``` \ No newline at end of file diff --git a/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/provisioning/AdminClientConfigCustomizer.java b/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/provisioning/AdminClientConfigCustomizer.java new file mode 100644 index 000000000..25b2fe79e --- /dev/null +++ b/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/provisioning/AdminClientConfigCustomizer.java @@ -0,0 +1,31 @@ +/* + * Copyright 2021-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.kafka.provisioning; + +import java.util.Map; + +/** + * Customizer for configuring AdminClient. + * + * @author Soby Chacko + * @since 3.1.2 + */ +@FunctionalInterface +public interface AdminClientConfigCustomizer { + + void configure(Map adminClientProperties); +} diff --git a/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisioner.java b/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisioner.java index a64d4fd34..519883c62 100644 --- a/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisioner.java +++ b/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisioner.java @@ -105,16 +105,23 @@ public class KafkaTopicProvisioner implements * Create an instance. * @param kafkaBinderConfigurationProperties the binder configuration properties. * @param kafkaProperties the boot Kafka properties used to build the + * @param adminClientConfigCustomizer to customize {@link AdminClient}. * {@link AdminClient}. */ public KafkaTopicProvisioner( KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties, - KafkaProperties kafkaProperties) { + KafkaProperties kafkaProperties, + AdminClientConfigCustomizer adminClientConfigCustomizer) { Assert.isTrue(kafkaProperties != null, "KafkaProperties cannot be null"); - this.adminClientProperties = kafkaProperties.buildAdminProperties(); this.configurationProperties = kafkaBinderConfigurationProperties; + this.adminClientProperties = kafkaProperties.buildAdminProperties(); normalalizeBootPropsWithBinder(this.adminClientProperties, kafkaProperties, kafkaBinderConfigurationProperties); + // If the application provides an AdminConfig customizer + // and overrides properties, that takes precedence. + if (adminClientConfigCustomizer != null) { + adminClientConfigCustomizer.configure(this.adminClientProperties); + } } /** @@ -151,7 +158,7 @@ public ProducerDestination provisionProducerDestination(final String name, logger.info("Using kafka topic for outbound: " + name); } KafkaTopicUtils.validateTopicName(name); - try (AdminClient adminClient = AdminClient.create(this.adminClientProperties)) { + try (AdminClient adminClient = createAdminClient()) { createTopic(adminClient, name, properties.getPartitionCount(), false, properties.getExtension().getTopic()); int partitions = 0; diff --git a/spring-cloud-stream-binder-kafka-core/src/test/java/org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisionerTests.java b/spring-cloud-stream-binder-kafka-core/src/test/java/org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisionerTests.java index 6ceda7b44..fd630191b 100644 --- a/spring-cloud-stream-binder-kafka-core/src/test/java/org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisionerTests.java +++ b/spring-cloud-stream-binder-kafka-core/src/test/java/org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisionerTests.java @@ -42,6 +42,8 @@ */ public class KafkaTopicProvisionerTests { + AdminClientConfigCustomizer adminClientConfigCustomizer = adminClientProperties -> adminClientProperties.put("foo", "bar"); + @SuppressWarnings("rawtypes") @Test public void bootPropertiesOverriddenExceptServers() throws Exception { @@ -58,7 +60,7 @@ public void bootPropertiesOverriddenExceptServers() throws Exception { ts.getFile().getAbsolutePath()); binderConfig.setBrokers("localhost:9092"); KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderConfig, - bootConfig); + bootConfig, adminClientConfigCustomizer); AdminClient adminClient = provisioner.createAdminClient(); assertThat(KafkaTestUtils.getPropertyValue(adminClient, "client.selector.channelBuilder")).isInstanceOf(SslChannelBuilder.class); @@ -67,6 +69,7 @@ public void bootPropertiesOverriddenExceptServers() throws Exception { assertThat( ((List) configs.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)).get(0)) .isEqualTo("localhost:1234"); + assertThat(configs.get("foo")).isEqualTo("bar"); adminClient.close(); } @@ -86,7 +89,7 @@ public void bootPropertiesOverriddenIncludingServers() throws Exception { ts.getFile().getAbsolutePath()); binderConfig.setBrokers("localhost:1234"); KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderConfig, - bootConfig); + bootConfig, adminClientConfigCustomizer); AdminClient adminClient = provisioner.createAdminClient(); assertThat(KafkaTestUtils.getPropertyValue(adminClient, "client.selector.channelBuilder")).isInstanceOf(SslChannelBuilder.class); @@ -106,7 +109,7 @@ public void brokersInvalid() throws Exception { binderConfig.getConfiguration().put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:1234"); try { - new KafkaTopicProvisioner(binderConfig, bootConfig); + new KafkaTopicProvisioner(binderConfig, bootConfig, adminClientConfigCustomizer); fail("Expected illegal state"); } catch (IllegalStateException e) { diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/GlobalKTableBinderConfiguration.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/GlobalKTableBinderConfiguration.java index 2fcceed02..7eae7953d 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/GlobalKTableBinderConfiguration.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/GlobalKTableBinderConfiguration.java @@ -18,11 +18,13 @@ import java.util.Map; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.BeanFactoryPostProcessor; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer; import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner; import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties; import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties; @@ -46,8 +48,8 @@ public class GlobalKTableBinderConfiguration { @Bean public KafkaTopicProvisioner provisioningProvider( KafkaStreamsBinderConfigurationProperties binderConfigurationProperties, - KafkaProperties kafkaProperties) { - return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties); + KafkaProperties kafkaProperties, ObjectProvider adminClientConfigCustomizer) { + return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties, adminClientConfigCustomizer.getIfUnique()); } @Bean diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBinderConfiguration.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBinderConfiguration.java index 29aa0ed9e..9450bc392 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBinderConfiguration.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBinderConfiguration.java @@ -16,10 +16,12 @@ package org.springframework.cloud.stream.binder.kafka.streams; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.config.BeanFactoryPostProcessor; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer; import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner; import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties; import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties; @@ -44,9 +46,9 @@ public class KStreamBinderConfiguration { @Bean public KafkaTopicProvisioner provisioningProvider( KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties, - KafkaProperties kafkaProperties) { + KafkaProperties kafkaProperties, ObjectProvider adminClientConfigCustomizer) { return new KafkaTopicProvisioner(kafkaStreamsBinderConfigurationProperties, - kafkaProperties); + kafkaProperties, adminClientConfigCustomizer.getIfUnique()); } @Bean diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBinderConfiguration.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBinderConfiguration.java index 10f199f37..825e4a2ae 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBinderConfiguration.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBinderConfiguration.java @@ -18,11 +18,13 @@ import java.util.Map; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.BeanFactoryPostProcessor; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer; import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner; import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties; import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties; @@ -46,8 +48,8 @@ public class KTableBinderConfiguration { @Bean public KafkaTopicProvisioner provisioningProvider( KafkaStreamsBinderConfigurationProperties binderConfigurationProperties, - KafkaProperties kafkaProperties) { - return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties); + KafkaProperties kafkaProperties, ObjectProvider adminClientConfigCustomizer) { + return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties, adminClientConfigCustomizer.getIfUnique()); } @Bean diff --git a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfiguration.java b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfiguration.java index ad5ae08f9..953bab7e1 100644 --- a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfiguration.java +++ b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfiguration.java @@ -38,6 +38,7 @@ import org.springframework.cloud.stream.binder.kafka.properties.JaasLoginModuleConfiguration; import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties; import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties; +import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer; import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner; import org.springframework.cloud.stream.binder.kafka.utils.DlqDestinationResolver; import org.springframework.cloud.stream.binder.kafka.utils.DlqPartitionFunction; @@ -104,8 +105,10 @@ KafkaBinderConfigurationProperties configurationProperties( @Bean KafkaTopicProvisioner provisioningProvider( - KafkaBinderConfigurationProperties configurationProperties) { - return new KafkaTopicProvisioner(configurationProperties, this.kafkaProperties); + KafkaBinderConfigurationProperties configurationProperties, + ObjectProvider adminClientConfigCustomizer) { + return new KafkaTopicProvisioner(configurationProperties, + this.kafkaProperties, adminClientConfigCustomizer.getIfUnique()); } @SuppressWarnings("unchecked") diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/AutoCreateTopicDisabledTests.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/AutoCreateTopicDisabledTests.java index 32c32a96a..672a020a0 100644 --- a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/AutoCreateTopicDisabledTests.java +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/AutoCreateTopicDisabledTests.java @@ -65,7 +65,7 @@ public void testAutoCreateTopicDisabledFailsOnConsumerIfTopicNonExistentOnBroker configurationProperties.setAutoCreateTopics(false); KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner( - configurationProperties, kafkaProperties); + configurationProperties, kafkaProperties, null); provisioningProvider.setMetadataRetryOperations(new RetryTemplate()); KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder( @@ -97,7 +97,7 @@ public void testAutoCreateTopicDisabledFailsOnProducerIfTopicNonExistentOnBroker configurationProperties.getConfiguration().put("max.block.ms", "3000"); KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner( - configurationProperties, kafkaProperties); + configurationProperties, kafkaProperties, null); SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(1); final RetryTemplate metadataRetryOperations = new RetryTemplate(); metadataRetryOperations.setRetryPolicy(simpleRetryPolicy); diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java index 0a3d66dd8..33688243c 100644 --- a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java @@ -209,7 +209,7 @@ protected KafkaTestBinder getBinder() { if (binder == null) { KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties(); KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner( - binderConfiguration, new TestKafkaProperties()); + binderConfiguration, new TestKafkaProperties(), null); try { kafkaTopicProvisioner.afterPropertiesSet(); } @@ -232,7 +232,7 @@ private KafkaTestBinder getBinder( DlqPartitionFunction dlqPartitionFunction, DlqDestinationResolver dlqDestinationResolver) { KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner( - kafkaBinderConfigurationProperties, new TestKafkaProperties()); + kafkaBinderConfigurationProperties, new TestKafkaProperties(), null); try { provisioningProvider.afterPropertiesSet(); } @@ -401,7 +401,7 @@ public void testCustomHeaderMapper() throws Exception { binderConfiguration.setHeaderMapperBeanName("headerMapper"); KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner( - binderConfiguration, new TestKafkaProperties()); + binderConfiguration, new TestKafkaProperties(), null); try { kafkaTopicProvisioner.afterPropertiesSet(); } @@ -478,7 +478,7 @@ public void testWellKnownHeaderMapperWithBeanNameKafkaHeaderMapper() throws Exce KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties(); KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner( - binderConfiguration, new TestKafkaProperties()); + binderConfiguration, new TestKafkaProperties(), null); try { kafkaTopicProvisioner.afterPropertiesSet(); } @@ -3656,7 +3656,7 @@ public void testInternalHeadersNotPropagatedGuts(String name, String[] headerPat binderConfiguration.setHeaderMapperBeanName("headerMapper"); KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner( - binderConfiguration, new TestKafkaProperties()); + binderConfiguration, new TestKafkaProperties(), null); try { kafkaTopicProvisioner.afterPropertiesSet(); } diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderUnitTests.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderUnitTests.java index 5f3fce064..234e32eb0 100644 --- a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderUnitTests.java +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderUnitTests.java @@ -78,7 +78,7 @@ public void testPropertyOverrides() throws Exception { KafkaBinderConfigurationProperties binderConfigurationProperties = new KafkaBinderConfigurationProperties( kafkaProperties); KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner( - binderConfigurationProperties, kafkaProperties); + binderConfigurationProperties, kafkaProperties, null); KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder( binderConfigurationProperties, provisioningProvider); KafkaConsumerProperties consumerProps = new KafkaConsumerProperties(); diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTransactionTests.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTransactionTests.java index f588f0a8b..518d01fa7 100644 --- a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTransactionTests.java +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTransactionTests.java @@ -73,7 +73,7 @@ public void testProducerRunsInTx() { configurationProperties.getTransaction().setTransactionIdPrefix("foo-"); configurationProperties.getTransaction().getProducer().setUseNativeEncoding(true); KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner( - configurationProperties, kafkaProperties); + configurationProperties, kafkaProperties, null); provisioningProvider.setMetadataRetryOperations(new RetryTemplate()); final Producer mockProducer = mock(Producer.class); given(mockProducer.send(any(), any())).willReturn(new SettableListenableFuture<>());