From 16ef7eff97dc46a96436a0b8a79e778594c32307 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Thu, 21 Nov 2024 12:28:29 +0530 Subject: [PATCH] added integration test for verifying group support in config apis --- kafka/integration_test.go | 57 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/kafka/integration_test.go b/kafka/integration_test.go index d0fa2eb0d..5c6cb9b8e 100644 --- a/kafka/integration_test.go +++ b/kafka/integration_test.go @@ -2007,6 +2007,63 @@ func (its *IntegrationTestSuite) TestAdminConfig() { if topicResult[0].Error.Code() != ErrNoError { t.Fatalf("Failed to delete topic %s: %s", topic, topicResult[0].Error) } + + if !testConsumerGroupProtocolClassic() { + // New test case for ResourceType.Group (ResourceGroup) + groupID := fmt.Sprintf("test-group-%d", rand.Intn(100000)) + + // Incremental Alter Configs for group + t.Logf("Incrementally altering configs for consumer group %s", groupID) + groupConfig := map[string]string{ + "consumer.session.timeout.ms": "50000", + } + groupOps := map[string]AlterConfigOpType{ + "consumer.session.timeout.ms": AlterConfigOpTypeSet, + } + + groupConfigResource := ConfigResource{ + Type: ResourceGroup, + Name: groupID, + Config: StringMapToIncrementalConfigEntries(groupConfig, groupOps), + } + + ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + + // Perform IncrementalAlterConfigs + t.Logf("Performing IncrementalAlterConfigs for group %s", groupID) + alterRes, err = a.IncrementalAlterConfigs(ctx, []ConfigResource{groupConfigResource}) + if err != nil { + t.Fatalf("IncrementalAlterConfigs request failed for group %s: %v", groupID, err) + } + + // Expected results + expectedGroupConfig := []ConfigResourceResult{ + { + Type: ResourceGroup, + Name: groupID, + Config: map[string]ConfigEntryResult{ + "consumer.session.timeout.ms": { + Name: "consumer.session.timeout.ms", + Value: "50000", + }, + }, + }, + } + + // Validate results from IncrementalAlterConfigs + validateConfig(t, alterRes, expectedGroupConfig, false) + + // Read back group config to verify + t.Logf("Describing configs for group %s", groupID) + describeRes, err = a.DescribeConfigs(ctx, []ConfigResource{{Type: ResourceGroup, Name: groupID}}) + if err != nil { + t.Fatalf("DescribeConfigs request failed for group %s: %v", groupID, err) + } + + // Validate described configs + validateConfig(t, describeRes, expectedGroupConfig, true) + } } // Test AdminClient GetMetadata API