Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
[Transaction] Fix initTransaction might wait until request timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
BewareMyPower committed Aug 4, 2023
1 parent 0241e43 commit 227876c
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2178,10 +2178,6 @@ protected void handleInitProducerId(KafkaHeaderAndRequest kafkaHeaderAndRequest,
.setErrorCode(resp.getError().code())
.setProducerId(resp.getProducerId())
.setProducerEpoch(resp.getProducerEpoch());
if (resp.getError() == Errors.COORDINATOR_LOAD_IN_PROGRESS
|| resp.getError() == Errors.CONCURRENT_TRANSACTIONS) {
responseData.setThrottleTimeMs(1000);
}
response.complete(new InitProducerIdResponse(responseData));
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1153,7 +1153,6 @@ private KafkaProducer<Integer, String> buildTransactionProducer(String transacti
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaServerAdder());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000 * 10);
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
if (txTimeout > 0) {
producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, txTimeout);
Expand All @@ -1172,7 +1171,6 @@ private KafkaConsumer<Integer, String> buildTransactionConsumer(String groupId,
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaServerAdder());
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000 * 10);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, isolation);
Expand All @@ -1187,7 +1185,6 @@ private KafkaProducer<Integer, String> buildIdempotenceProducer() {
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaServerAdder());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000 * 10);
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

addCustomizeProps(producerProps);
Expand Down Expand Up @@ -1323,6 +1320,7 @@ public void testNotFencedWithBeginTransaction() throws Exception {
// beginTransaction doesn't do anything
producer1.beginTransaction();

producer1.commitTransaction(); // avoid close() being blocked for request timeout
producer1.close();
producer2.close();
}
Expand Down

0 comments on commit 227876c

Please sign in to comment.