Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP 848:Extend DescribeConfigs and IncrementalAlterConfigs to support GROUP Config #1344

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions kafka/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2007,6 +2007,63 @@ func (its *IntegrationTestSuite) TestAdminConfig() {
if topicResult[0].Error.Code() != ErrNoError {
t.Fatalf("Failed to delete topic %s: %s", topic, topicResult[0].Error)
}

if !testConsumerGroupProtocolClassic() {
// New test case for ResourceType.Group (ResourceGroup)
groupID := fmt.Sprintf("test-group-%d", rand.Intn(100000))

// Incremental Alter Configs for group
t.Logf("Incrementally altering configs for consumer group %s", groupID)
groupConfig := map[string]string{
"consumer.session.timeout.ms": "50000",
}
groupOps := map[string]AlterConfigOpType{
"consumer.session.timeout.ms": AlterConfigOpTypeSet,
}

groupConfigResource := ConfigResource{
Type: ResourceGroup,
Name: groupID,
Config: StringMapToIncrementalConfigEntries(groupConfig, groupOps),
}

ctx, cancel = context.WithCancel(context.Background())
defer cancel()

// Perform IncrementalAlterConfigs
t.Logf("Performing IncrementalAlterConfigs for group %s", groupID)
alterRes, err = a.IncrementalAlterConfigs(ctx, []ConfigResource{groupConfigResource})
if err != nil {
t.Fatalf("IncrementalAlterConfigs request failed for group %s: %v", groupID, err)
}

// Expected results
expectedGroupConfig := []ConfigResourceResult{
{
Type: ResourceGroup,
Name: groupID,
Config: map[string]ConfigEntryResult{
"consumer.session.timeout.ms": {
Name: "consumer.session.timeout.ms",
Value: "50000",
},
},
},
}

// Validate results from IncrementalAlterConfigs
validateConfig(t, alterRes, expectedGroupConfig, false)

// Read back group config to verify
t.Logf("Describing configs for group %s", groupID)
describeRes, err = a.DescribeConfigs(ctx, []ConfigResource{{Type: ResourceGroup, Name: groupID}})
if err != nil {
t.Fatalf("DescribeConfigs request failed for group %s: %v", groupID, err)
}

// Validate described configs
validateConfig(t, describeRes, expectedGroupConfig, true)
}
}

// Test AdminClient GetMetadata API
Expand Down