Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Argument to Support Server-Side Calculated Replication Factor #419

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 39 additions & 12 deletions kafka/resource_kafka_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,20 @@ func kafkaTopicResource() *schema.Resource {
ValidateFunc: validation.IntAtLeast(1),
},
"replication_factor": {
Type: schema.TypeInt,
Required: true,
ForceNew: false,
Description: "Number of replicas.",
ValidateFunc: validation.IntAtLeast(1),
Type: schema.TypeInt,
Required: true,
ForceNew: false,
Description: "Number of replicas.",
ValidateFunc: validation.IntAtLeast(1),
DiffSuppressFunc: checkManagedReplicationFactor,
},
"managed_replication_factor": {
Type: schema.TypeBool,
Required: false,
Optional: true,
ForceNew: false,
Default: false,
Description: "Replication factor is managed by server-side.",
},
"config": {
Type: schema.TypeMap,
Expand Down Expand Up @@ -101,8 +110,11 @@ func topicUpdate(ctx context.Context, d *schema.ResourceData, meta interface{})
return diag.FromErr(err)
}

// update replica count of existing partitions before adding new ones
if d.HasChange("replication_factor") {
managedReplicationFactor := d.Get("managed_replication_factor").(bool)
if managedReplicationFactor {
log.Printf("[INFO] Ignoring replication factor")
} else if d.HasChange("replication_factor") {
// update replica count of existing partitions before adding new ones
oi, ni := d.GetChange("replication_factor")
oldRF := oi.(int)
newRF := ni.(int)
Expand Down Expand Up @@ -131,7 +143,7 @@ func topicUpdate(ctx context.Context, d *schema.ResourceData, meta interface{})
}
}

if err := waitForTopicRefresh(ctx, c, d.Id(), t); err != nil {
if err := waitForTopicRefresh(ctx, c, d.Id(), t, managedReplicationFactor); err != nil {
return diag.FromErr(err)
}

Expand Down Expand Up @@ -170,12 +182,12 @@ func waitForRFUpdate(ctx context.Context, client *LazyClient, topic string) erro
return nil
}

func waitForTopicRefresh(ctx context.Context, client *LazyClient, topic string, expected Topic) error {
func waitForTopicRefresh(ctx context.Context, client *LazyClient, topic string, expected Topic, managedReplicationFactor bool) error {
timeout := time.Duration(client.Config.Timeout) * time.Second
stateConf := &retry.StateChangeConf{
Pending: []string{"Updating"},
Target: []string{"Ready"},
Refresh: topicRefreshFunc(client, topic, expected),
Refresh: topicRefreshFunc(client, topic, expected, managedReplicationFactor),
Timeout: timeout,
Delay: 1 * time.Second,
PollInterval: 1 * time.Second,
Expand All @@ -191,7 +203,7 @@ func waitForTopicRefresh(ctx context.Context, client *LazyClient, topic string,
return nil
}

func topicRefreshFunc(client *LazyClient, topic string, expected Topic) retry.StateRefreshFunc {
func topicRefreshFunc(client *LazyClient, topic string, expected Topic, managedReplicationFactor bool) retry.StateRefreshFunc {
return func() (result interface{}, s string, err error) {
log.Printf("[DEBUG] waiting for topic to update %s", topic)
actual, err := client.ReadTopic(topic, true)
Expand All @@ -200,6 +212,12 @@ func topicRefreshFunc(client *LazyClient, topic string, expected Topic) retry.St
return actual, "Error", err
}

// If the managed replication factor is set by server side
// we should ignore the replication factor in the comparison
if managedReplicationFactor {
actual.ReplicationFactor = -1
}

if expected.Equal(actual) {
return actual, "Ready", nil
}
Expand Down Expand Up @@ -274,6 +292,7 @@ func topicRead(ctx context.Context, d *schema.ResourceData, meta interface{}) di
errSet.Set("name", topic.Name)
errSet.Set("partitions", topic.Partitions)
errSet.Set("replication_factor", topic.ReplicationFactor)
errSet.Set("managed_replication_factor", d.Get("managed_replication_factor"))
errSet.Set("config", topic.Config)

if errSet.err != nil {
Expand Down Expand Up @@ -302,7 +321,11 @@ func customDiff(ctx context.Context, diff *schema.ResourceDiff, v interface{}) e
}
}

if diff.HasChange("replication_factor") {
managedReplicationFactor := diff.Get("managed_replication_factor").(bool)
if managedReplicationFactor {
log.Printf("[INFO] Ignoring replication factor")

} else if diff.HasChange("replication_factor") {
log.Printf("[INFO] Checking the diff!")
client := v.(*LazyClient)

Expand All @@ -321,3 +344,7 @@ func customDiff(ctx context.Context, diff *schema.ResourceDiff, v interface{}) e

return nil
}

func checkManagedReplicationFactor(k, old, new string, d *schema.ResourceData) bool {
return d.Get("managed_replication_factor").(bool)
}
109 changes: 109 additions & 0 deletions kafka/resource_kafka_topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,77 @@ func TestAcc_TopicAlterReplicationFactor(t *testing.T) {
})
}

func TestAcc_TopicAlterBetweenManagedReplicationFactorAndUnmanaged(t *testing.T) {
t.Parallel()
u, err := uuid.GenerateUUID()
if err != nil {
t.Fatal(err)
}
topicName := fmt.Sprintf("syslog-%s", u)
bs := testBootstrapServers[0]

keyEncoder := sarama.StringEncoder("same key -> same partition -> same ordering")
messages := []*sarama.ProducerMessage{
{
Topic: topicName,
Key: keyEncoder,
Value: sarama.StringEncoder("Krusty"),
},
{
Topic: topicName,
Key: keyEncoder,
Value: sarama.StringEncoder("Krab"),
},
{
Topic: topicName,
Key: keyEncoder,
Value: sarama.StringEncoder("Pizza"),
},
}

r.Test(t, r.TestCase{
ProviderFactories: overrideProviderFactory(),
PreCheck: func() { testAccPreCheck(t) },
CheckDestroy: testAccCheckTopicDestroy,
Steps: []r.TestStep{
{
Config: cfg(t, bs, fmt.Sprintf(testResourceTopic_updateRF, topicName, 1, 3)),
Check: r.ComposeTestCheckFunc(
testResourceTopic_produceMessages(messages),
testResourceTopic_initialCheck),
},
{
// Test altering from unmanaged replication factor to managed replication factor with same values
Config: cfg(t, bs, fmt.Sprintf(testResourceTopic_managedReplicationFactor, topicName, 1, 3)),
Check: r.ComposeTestCheckFunc(
testResourceTopic_updateManagedRFCheck,
testResourceTopic_checkSameMessages(messages)),
},
{
// Test updating partitions in managed replication factor
Config: cfg(t, bs, fmt.Sprintf(testResourceTopic_managedReplicationFactor, topicName, 1, 4)),
Check: r.ComposeTestCheckFunc(
testResourceTopic_updateManagedRFCheck,
testResourceTopic_checkSameMessages(messages)),
},
{
// Test updating replication factor while managed replication factor
Config: cfg(t, bs, fmt.Sprintf(testResourceTopic_managedReplicationFactor, topicName, 2, 4)),
Check: r.ComposeTestCheckFunc(
testResourceTopic_updateManagedRFCheck,
testResourceTopic_checkSameMessages(messages)),
},
{
// Test switching to unmanaged replication factor
Config: cfg(t, bs, fmt.Sprintf(testResourceTopic_updateRF, topicName, 1, 5)),
Check: r.ComposeTestCheckFunc(
testResourceTopic_updateRFCheck,
testResourceTopic_checkSameMessages(messages)),
},
},
})
}

func testResourceTopic_noConfigCheck(s *terraform.State) error {
resourceState := s.Modules[0].Resources["kafka_topic.test"]
if resourceState == nil {
Expand Down Expand Up @@ -354,6 +425,30 @@ func testResourceTopic_updateRFCheck(s *terraform.State) error {
return nil
}

func testResourceTopic_updateManagedRFCheck(s *terraform.State) error {
resourceState := s.Modules[0].Resources["kafka_topic.test"]
instanceState := resourceState.Primary
client := testProvider.Meta().(*LazyClient)
topicName := instanceState.Attributes["name"]

parsed, err := strconv.ParseInt(instanceState.Attributes["partitions"], 10, 32)
if err != nil {
return err
}
expectedPartitions := int32(parsed)

topic, err := client.ReadTopic(topicName, true)
if err != nil {
return err
}

if actual := topic.Partitions; actual != expectedPartitions {
return fmt.Errorf("expected %d partitions, but got %d", expectedPartitions, actual)
}

return nil
}

func testResourceTopic_checkSameMessages(producedMessages []*sarama.ProducerMessage) r.TestCheckFunc {
return func(s *terraform.State) error {
resourceState := s.Modules[0].Resources["kafka_topic.test"]
Expand Down Expand Up @@ -509,3 +604,17 @@ resource "kafka_topic" "test" {
}
}
`

const testResourceTopic_managedReplicationFactor = `
resource "kafka_topic" "test" {
name = "%s"
replication_factor = %d
partitions = %d
managed_replication_factor = true

config = {
"retention.ms" = "11111"
"segment.ms" = "22222"
}
}
`
5 changes: 5 additions & 0 deletions kafka/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ func metaToTopic(d *schema.ResourceData, meta interface{}) Topic {
convertedPartitions := int32(partitions)
convertedRF := int16(replicationFactor)
config := d.Get("config").(map[string]interface{})
managedReplicationFactor := d.Get("managed_replication_factor").(bool)

if managedReplicationFactor {
convertedRF = -1
}

m2 := make(map[string]*string)
for key, value := range config {
Expand Down
Loading