From b10b22767f8063321c90bc9ee1b0aadc5902c31a Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Tue, 3 Dec 2024 05:59:42 +0800 Subject: [PATCH] Support set client.id with Zone ID for Kafka driver (#424) * Support set client.id with Zone ID for Kafka driver * Apply spotless fix --- .../driver/kafka/KafkaBenchmarkDriver.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java b/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java index a2789ac2a..d506b5b37 100644 --- a/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java +++ b/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java @@ -45,6 +45,9 @@ public class KafkaBenchmarkDriver implements BenchmarkDriver { + private static final String ZONE_ID_CONFIG = "zone.id"; + private static final String ZONE_ID_TEMPLATE = "{zone.id}"; + private static final String KAFKA_CLIENT_ID = "client.id"; private Config config; private List producers = Collections.synchronizedList(new ArrayList<>()); @@ -63,6 +66,13 @@ public void initialize(File configurationFile, StatsLogger statsLogger) throws I Properties commonProperties = new Properties(); commonProperties.load(new StringReader(config.commonConfig)); + if (commonProperties.containsKey(KAFKA_CLIENT_ID)) { + commonProperties.put( + KAFKA_CLIENT_ID, + applyZoneId( + commonProperties.getProperty(KAFKA_CLIENT_ID), System.getProperty(ZONE_ID_CONFIG))); + } + producerProperties = new Properties(); commonProperties.forEach((key, value) -> producerProperties.put(key, value)); producerProperties.load(new StringReader(config.producerConfig)); @@ -151,6 +161,10 @@ public void close() throws Exception { admin.close(); } + private static String applyZoneId(String clientId, String zoneId) { + return clientId.replace(ZONE_ID_TEMPLATE, zoneId); + } + private static final ObjectMapper mapper = new ObjectMapper(new YAMLFactory()) .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);