diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java index 6c877affe7..3afa9cce24 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java @@ -116,9 +116,10 @@ public synchronized void run() { } // add the topicPartition with timeout error if it's not existed in responseMap entriesPerPartition.keySet().forEach(topicPartition -> { - if (!responseMap.containsKey(topicPartition)) { + ProduceResponse.PartitionResponse response = responseMap.putIfAbsent(topicPartition, + new ProduceResponse.PartitionResponse(Errors.REQUEST_TIMED_OUT)); + if (response == null) { log.error("Adding dummy REQUEST_TIMED_OUT to produce response for {}", topicPartition); - responseMap.put(topicPartition, new ProduceResponse.PartitionResponse(Errors.REQUEST_TIMED_OUT)); } }); if (log.isDebugEnabled()) {