From a6e35f92fbfcba62cab6070b7d5b6b2b82d3c6c1 Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Mon, 11 Jan 2021 09:13:43 +0100 Subject: [PATCH] Txn integration tests: rely on KIP-447 --- kafka/txn_integration_test.go | 35 ++++++++++++++--------------------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/kafka/txn_integration_test.go b/kafka/txn_integration_test.go index 4d2c57461..4cd298465 100644 --- a/kafka/txn_integration_test.go +++ b/kafka/txn_integration_test.go @@ -297,6 +297,19 @@ func TestTransactionalSendOffsets(t *testing.T) { } } + // Create consumer (to read committed offsets) prior to closing the + // consumer to trigger the race condition where the transaction is + // not fully committed by the time consumer.Committed() is called. + // Prior to KIP-447 this would result in the committed offsets not + // showing up, but with KIP-447 the consumer automatically retries + // the offset retrieval. + t.Logf("Creating consumer for (later) offset retrieval\n") + consumer, err := NewConsumer(consumerConfig) + if err != nil { + t.Fatalf("Failed to create Consumer client: %s\n", err) + } + + // Close producer // signal go-routine to finish close(termChan) // wait for go-routine to finish @@ -304,27 +317,7 @@ func TestTransactionalSendOffsets(t *testing.T) { producer.Close() - // Until KIP-447 is implemented we need to call - // InitTransactions() after transactions are committed to make - // sure the committed offsets are made available to consumers. - producer2, err := NewProducer(config) - if err != nil { - t.Fatalf("Failed to create Producer client: %s\n", err) - } - - err = producer2.InitTransactions(nil) - if err != nil { - t.Fatalf("InitTransactions() failed: %v\n", err) - } - - producer2.Close() - - // Read committed offsets. - consumer, err := NewConsumer(consumerConfig) - if err != nil { - t.Fatalf("Failed to create Consumer client: %s\n", err) - } - + t.Logf("Retrieving committed offsets\n") committed, err := consumer.Committed(partitions, -1) if err != nil { t.Fatalf("Failed to get committed offsets: %s\n", err)