From 6108953bddaeb73c82f5501fcf0d822ad1fb791f Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 10 Mar 2023 16:29:14 +0100 Subject: [PATCH] [transactions] Implement KIP-664 - DESCRIBE_TRANSACTIONS (#77) (cherry picked from commit 1f2fe99e0e88c6bba1291525c242563a5eb94ef2) --- .../handlers/kop/KafkaCommandDecoder.java | 6 + .../handlers/kop/KafkaRequestHandler.java | 13 ++ .../transaction/TransactionCoordinator.java | 72 +++++++++ .../transaction/TransactionTest.java | 144 ++++-------------- 4 files changed, 123 insertions(+), 112 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index 11ce298762..dbce355edf 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -328,6 +328,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception case LIST_TRANSACTIONS: handleListTransactionsRequest(kafkaHeaderAndRequest, responseFuture); break; + case DESCRIBE_TRANSACTIONS: + handleDescribeTransactionsRequest(kafkaHeaderAndRequest, responseFuture); + break; case DELETE_GROUPS: handleDeleteGroupsRequest(kafkaHeaderAndRequest, responseFuture); break; @@ -589,6 +592,9 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest, protected abstract void handleListTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture response); + protected abstract void + handleDescribeTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture response); + protected abstract void handleDeleteGroupsRequest(KafkaHeaderAndRequest deleteGroups, CompletableFuture response); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index d888957a95..818d331abe 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -117,6 +117,7 @@ import org.apache.kafka.common.message.DescribeClusterResponseData; import org.apache.kafka.common.message.DescribeConfigsRequestData; import org.apache.kafka.common.message.DescribeConfigsResponseData; +import org.apache.kafka.common.message.DescribeTransactionsResponseData; import org.apache.kafka.common.message.EndTxnRequestData; import org.apache.kafka.common.message.EndTxnResponseData; import org.apache.kafka.common.message.FetchRequestData; @@ -161,6 +162,8 @@ import org.apache.kafka.common.requests.DescribeConfigsRequest; import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.apache.kafka.common.requests.DescribeGroupsRequest; +import org.apache.kafka.common.requests.DescribeTransactionsRequest; +import org.apache.kafka.common.requests.DescribeTransactionsResponse; import org.apache.kafka.common.requests.EndTxnRequest; import org.apache.kafka.common.requests.EndTxnResponse; import org.apache.kafka.common.requests.FetchRequest; @@ -2043,6 +2046,16 @@ protected void handleListTransactionsRequest(KafkaHeaderAndRequest listTransacti resultFuture.complete(new ListTransactionsResponse(listResult)); } + @Override + protected void handleDescribeTransactionsRequest(KafkaHeaderAndRequest listGroups, + CompletableFuture response) { + checkArgument(listGroups.getRequest() instanceof DescribeTransactionsRequest); + DescribeTransactionsRequest request = (DescribeTransactionsRequest) listGroups.getRequest(); + DescribeTransactionsResponseData describeResult = getTransactionCoordinator() + .handleDescribeTransactions(request.data().transactionalIds()); + response.complete(new DescribeTransactionsResponse(describeResult)); + } + @Override protected void handleDeleteGroupsRequest(KafkaHeaderAndRequest deleteGroups, CompletableFuture resultFuture) { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java index 0771621228..5b887057a8 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java @@ -13,6 +13,7 @@ */ package io.streamnative.pulsar.handlers.kop.coordinator.transaction; +import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.DEAD; import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.ONGOING; import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.PREPARE_ABORT; import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.PREPARE_COMMIT; @@ -53,6 +54,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.DescribeTransactionsResponseData; import org.apache.kafka.common.message.ListTransactionsResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.RecordBatch; @@ -233,6 +235,76 @@ public ListTransactionsResponseData handleListTransactions(List filtered return this.txnManager.listTransactionStates(filteredProducerIds, filteredStates); } + public DescribeTransactionsResponseData handleDescribeTransactions(List transactionalIds) { + DescribeTransactionsResponseData response = new DescribeTransactionsResponseData(); + if (transactionalIds != null) { + transactionalIds.forEach(transactionalId -> { + DescribeTransactionsResponseData.TransactionState transactionState = + handleDescribeTransactions(transactionalId); + response.transactionStates().add(transactionState); + }); + } + return response; + } + + private DescribeTransactionsResponseData.TransactionState handleDescribeTransactions(String transactionalId) { + // https://github.com/apache/kafka/blob/915991445fde106d02e61a70425ae2601c813db0/core/ + // src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L270 + if (transactionalId == null) { + throw new IllegalArgumentException("Invalid null transactionalId"); + } + + DescribeTransactionsResponseData.TransactionState transactionState = + new DescribeTransactionsResponseData.TransactionState() + .setTransactionalId(transactionalId); + + if (!isActive.get()) { + transactionState.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()); + } else if (transactionalId.isEmpty()) { + transactionState.setErrorCode(Errors.INVALID_REQUEST.code()); + } else { + Either> tState = + txnManager.getTransactionState(transactionalId); + if (tState.isLeft()) { + transactionState.setErrorCode(tState.getLeft().code()); + } else { + Optional right = tState.getRight(); + if (!right.isPresent()) { + transactionState.setErrorCode(Errors.TRANSACTIONAL_ID_NOT_FOUND.code()); + } else { + CoordinatorEpochAndTxnMetadata coordinatorEpochAndMetadata = right.get(); + TransactionMetadata txnMetadata = coordinatorEpochAndMetadata.getTransactionMetadata(); + txnMetadata.inLock(() -> { + if (txnMetadata.getState() == DEAD) { + // The transaction state is being expired, so ignore it + transactionState.setErrorCode(Errors.TRANSACTIONAL_ID_NOT_FOUND.code()); + } else { + txnMetadata.getTopicPartitions().forEach(topicPartition -> { + var topicData = transactionState.topics().find(topicPartition.topic()); + if (topicData == null) { + topicData = new DescribeTransactionsResponseData.TopicData() + .setTopic(topicPartition.topic()); + transactionState.topics().add(topicData); + } + topicData.partitions().add(topicPartition.partition()); + }); + + transactionState + .setErrorCode(Errors.NONE.code()) + .setProducerId(txnMetadata.getProducerId()) + .setProducerEpoch(txnMetadata.getProducerEpoch()) + .setTransactionState(txnMetadata.getState().toAdminState().toString()) + .setTransactionTimeoutMs(txnMetadata.getTxnTimeoutMs()) + .setTransactionStartTimeMs(txnMetadata.getTxnStartTimestamp()); + } + return null; + }); + } + } + } + return transactionState; + } + @Data @EqualsAndHashCode @AllArgsConstructor diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java index e6abf77605..415846c46d 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java @@ -56,6 +56,7 @@ import org.apache.kafka.clients.admin.ListTransactionsOptions; import org.apache.kafka.clients.admin.ListTransactionsResult; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TransactionDescription; import org.apache.kafka.clients.admin.TransactionListing; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -87,6 +88,8 @@ @Slf4j public class TransactionTest extends KopProtocolHandlerTestBase { + private static final int TRANSACTION_TIMEOUT_CONFIG_VALUE = 600 * 1000; + protected void setupTransactions() { this.conf.setDefaultNumberOfNamespaceBundles(4); this.conf.setOffsetsTopicNumPartitions(10); @@ -1162,7 +1165,7 @@ private KafkaProducer buildTransactionProducer(String transacti producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, txTimeout); } else { // very long time-out - producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 600 * 1000); + producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, TRANSACTION_TIMEOUT_CONFIG_VALUE); } producerProps.put(CLIENT_ID_CONFIG, "dummy_client_" + UUID.randomUUID()); addCustomizeProps(producerProps); @@ -1491,10 +1494,10 @@ public void testAbortedTxEventuallyPurged() throws Exception { } } - @Test(timeOut = 100000 * 30) - public void testListTransactions() throws Exception { + @Test(timeOut = 1000 * 30) + public void testListAndDescribeTransactions() throws Exception { - String topicName = "testListTransactions"; + String topicName = "testListAndDescribeTransactions"; String transactionalId = "myProducer_" + UUID.randomUUID(); @Cleanup @@ -1599,116 +1602,33 @@ private static void assertTransactionState(AdminClient kafkaAdmin, String transa .findFirst() .get(); assertEquals(transactionState, transactionListing.state()); - } - - @Test(timeOut = 100000 * 30) - public void testListTransactions() throws Exception { - - String topicName = "testListTransactions"; - String transactionalId = "myProducer_" + UUID.randomUUID(); - - @Cleanup - KafkaProducer producer = buildTransactionProducer(transactionalId); - @Cleanup - AdminClient kafkaAdmin = AdminClient.create(newKafkaAdminClientProperties()); - - producer.initTransactions(); - producer.beginTransaction(); - assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.EMPTY); - producer.send(new ProducerRecord<>(topicName, 1, "bar")).get(); - producer.flush(); - - ListTransactionsResult listTransactionsResult = kafkaAdmin.listTransactions(); - listTransactionsResult.all().get().forEach(t -> { - log.info("Found transactionalId: {} {} {}", - t.transactionalId(), - t.producerId(), - t.state()); - }); - assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.ONGOING); - Awaitility.await().untilAsserted(() -> { - assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.ONGOING); - }); - producer.commitTransaction(); - Awaitility.await().untilAsserted(() -> { - assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT); - }); - producer.beginTransaction(); - - assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT); - - producer.send(new ProducerRecord<>(topicName, 1, "bar")).get(); - producer.flush(); - producer.abortTransaction(); - Awaitility.await().untilAsserted(() -> { - assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT); - }); - producer.close(); - assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT); - } - private static void assertTransactionState(AdminClient kafkaAdmin, String transactionalId, - org.apache.kafka.clients.admin.TransactionState transactionState) - throws Exception { - ListTransactionsResult listTransactionsResult = kafkaAdmin.listTransactions(); - Collection transactionListings = listTransactionsResult.all().get(); - transactionListings.forEach(t -> { - log.info("Found transactionalId: {} {} {}", - t.transactionalId(), - t.producerId(), - t.state()); - }); - TransactionListing transactionListing = transactionListings - .stream() - .filter(t -> t.transactionalId().equals(transactionalId)) - .findFirst() - .get(); - assertEquals(transactionState, transactionListing.state()); - - // filter for the same state - ListTransactionsOptions optionFilterState = new ListTransactionsOptions() - .filterStates(Collections.singleton(transactionState)); - listTransactionsResult = kafkaAdmin.listTransactions(optionFilterState); - transactionListings = listTransactionsResult.all().get(); - transactionListing = transactionListings - .stream() - .filter(t -> t.transactionalId().equals(transactionalId)) - .findFirst() - .get(); - assertEquals(transactionState, transactionListing.state()); - - - // filter for the same producer id - ListTransactionsOptions optionFilterProducer = new ListTransactionsOptions() - .filterProducerIds(Collections.singleton(transactionListing.producerId())); - listTransactionsResult = kafkaAdmin.listTransactions(optionFilterProducer); - transactionListings = listTransactionsResult.all().get(); - transactionListing = transactionListings - .stream() - .filter(t -> t.transactionalId().equals(transactionalId)) - .findFirst() - .get(); - assertEquals(transactionState, transactionListing.state()); + Map map = + kafkaAdmin.describeTransactions(Collections.singleton(transactionalId)) + .all().get(); + assertEquals(1, map.size()); + TransactionDescription transactionDescription = map.get(transactionalId); + log.info("transactionDescription {}", transactionDescription); + assertNotNull(transactionDescription); + assertEquals(transactionDescription.state(), transactionState); + assertTrue(transactionDescription.producerEpoch() >= 0); + assertEquals(TRANSACTION_TIMEOUT_CONFIG_VALUE, transactionDescription.transactionTimeoutMs()); + assertTrue(transactionDescription.transactionStartTimeMs().isPresent()); + assertTrue(transactionDescription.coordinatorId() >= 0); + + switch (transactionState) { + case EMPTY: + case COMPLETE_COMMIT: + case COMPLETE_ABORT: + assertEquals(0, transactionDescription.topicPartitions().size()); + break; + case ONGOING: + assertEquals(1, transactionDescription.topicPartitions().size()); + break; + default: + fail("unhandled " + transactionState); + } - // filter for the same producer id and state - ListTransactionsOptions optionFilterProducerAndState = new ListTransactionsOptions() - .filterStates(Collections.singleton(transactionState)) - .filterProducerIds(Collections.singleton(transactionListing.producerId())); - listTransactionsResult = kafkaAdmin.listTransactions(optionFilterProducerAndState); - transactionListings = listTransactionsResult.all().get(); - transactionListing = transactionListings - .stream() - .filter(t -> t.transactionalId().equals(transactionalId)) - .findFirst() - .get(); - assertEquals(transactionState, transactionListing.state()); } /**