diff --git a/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java b/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java index cc54c7577..ccad035fd 100644 --- a/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java +++ b/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java @@ -92,14 +92,15 @@ public String getTopicNamePrefix() { int fetchCnt = 0; - private synchronized Set fetchMasterAndSlaveAddrByClusterName(final MQAdminExt adminExt, - final String clusterName) throws RemotingConnectException, RemotingSendRequestException, - RemotingTimeoutException, MQBrokerException, InterruptedException { + private synchronized Set fetchMasterAndSlaveAddrByClusterName( + final MQAdminExt adminExt, final String clusterName) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, + MQBrokerException, InterruptedException { Set brokerList = cachedBrokerAddr.get(clusterName); if (brokerList == null) { brokerList = - CommandUtil.fetchMasterAndSlaveAddrByClusterName( - adminExt, this.rmqClientConfig.clusterName); + CommandUtil.fetchMasterAndSlaveAddrByClusterName( + adminExt, this.rmqClientConfig.clusterName); cachedBrokerAddr.put(clusterName, brokerList); if (brokerList.isEmpty()) { throw new RuntimeException("get brokerAddr return null, clusterName: " + clusterName); @@ -114,35 +115,37 @@ private synchronized Set fetchMasterAndSlaveAddrByClusterName(final MQAd @Override public CompletableFuture createTopic(final String topic, final int partitions) { return CompletableFuture.runAsync( - () -> { - TopicConfig topicConfig = new TopicConfig(); - topicConfig.setOrder(false); - topicConfig.setPerm(6); - topicConfig.setReadQueueNums(partitions); - topicConfig.setWriteQueueNums(partitions); - topicConfig.setTopicName(topic); - if (Boolean.TRUE.equals(this.rmqClientConfig.batchCQ)) { - topicConfig.getAttributes().put("+" + TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), "BatchCQ"); - } + () -> { + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setOrder(false); + topicConfig.setPerm(6); + topicConfig.setReadQueueNums(partitions); + topicConfig.setWriteQueueNums(partitions); + topicConfig.setTopicName(topic); + if (Boolean.TRUE.equals(this.rmqClientConfig.batchCQ)) { + topicConfig + .getAttributes() + .put("+" + TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), "BatchCQ"); + } - try { - Set brokerList = - fetchMasterAndSlaveAddrByClusterName( - this.rmqAdmin, this.rmqClientConfig.clusterName); - topicConfig.setReadQueueNums(Math.max(1, partitions / brokerList.size())); - topicConfig.setWriteQueueNums(Math.max(1, partitions / brokerList.size())); + try { + Set brokerList = + fetchMasterAndSlaveAddrByClusterName( + this.rmqAdmin, this.rmqClientConfig.clusterName); + topicConfig.setReadQueueNums(Math.max(1, partitions / brokerList.size())); + topicConfig.setWriteQueueNums(Math.max(1, partitions / brokerList.size())); - for (String brokerAddr : brokerList) { - this.rmqAdmin.createAndUpdateTopicConfig(brokerAddr, topicConfig); + for (String brokerAddr : brokerList) { + this.rmqAdmin.createAndUpdateTopicConfig(brokerAddr, topicConfig); + } + } catch (Exception e) { + throw new RuntimeException( + String.format( + "Failed to create topic [%s] to cluster [%s]", + topic, this.rmqClientConfig.clusterName), + e); } - } catch (Exception e) { - throw new RuntimeException( - String.format( - "Failed to create topic [%s] to cluster [%s]", - topic, this.rmqClientConfig.clusterName), - e); - } - }); + }); } @Override