diff --git a/README.md b/README.md index 507762e3..00bafb0f 100644 --- a/README.md +++ b/README.md @@ -164,6 +164,17 @@ The `create` command creates resources in the cluster from a configuration file. Currently, only ACLs are supported. The create command is separate from the apply command as it is intended for usage with immutable resources managed by topicctl. +#### delete +``` +topicctl delete [flags] [operation] +``` + +The `delete` subcommand deletes a particular resource type in the cluster. +Currently, the following operations are supported: +| Subcommand | Description | +| --------- | ----------- | +| `delete acl [flags]` | Deletes a single ACL in the cluster matching the provided flags | + #### get ``` diff --git a/cmd/topicctl/subcmd/create.go b/cmd/topicctl/subcmd/create.go index 09a1299b..7bef8883 100644 --- a/cmd/topicctl/subcmd/create.go +++ b/cmd/topicctl/subcmd/create.go @@ -8,10 +8,10 @@ import ( "path/filepath" "syscall" + "github.com/segmentio/topicctl/pkg/acl" "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/cli" "github.com/segmentio/topicctl/pkg/config" - "github.com/segmentio/topicctl/pkg/create" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -171,14 +171,14 @@ func createACL( clusterConfigPath, ) - creatorConfig := create.ACLCreatorConfig{ + aclAdminConfig := acl.ACLAdminConfig{ DryRun: createConfig.dryRun, SkipConfirm: createConfig.skipConfirm, ACLConfig: aclConfig, ClusterConfig: clusterConfig, } - if err := cliRunner.CreateACL(ctx, creatorConfig); err != nil { + if err := cliRunner.CreateACL(ctx, aclAdminConfig); err != nil { return err } } diff --git a/cmd/topicctl/subcmd/delete.go b/cmd/topicctl/subcmd/delete.go new file mode 100644 index 00000000..91f6345d --- /dev/null +++ b/cmd/topicctl/subcmd/delete.go @@ -0,0 +1,147 @@ +package subcmd + +import ( + "context" + "strings" + + "github.com/aws/aws-sdk-go/aws/session" + "github.com/segmentio/kafka-go" + "github.com/segmentio/topicctl/pkg/acl" + "github.com/segmentio/topicctl/pkg/cli" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +var deleteCmd = &cobra.Command{ + Use: "delete [resource type]", + Short: "delete instances of a particular type", + Long: strings.Join( + []string{ + "Deletes instances of a particular type.", + }, + "\n", + ), + PersistentPreRunE: deletePreRun, +} + +type deleteCmdConfig struct { + dryRun bool + + shared sharedOptions +} + +var deleteConfig deleteCmdConfig + +func init() { + deleteCmd.PersistentFlags().BoolVar( + &deleteConfig.dryRun, + "dry-run", + false, + "Do a dry-run", + ) + + addSharedFlags(deleteCmd, &deleteConfig.shared) + deleteCmd.AddCommand( + deleteACLCmd(), + ) + RootCmd.AddCommand(deleteCmd) +} + +func deletePreRun(cmd *cobra.Command, args []string) error { + return deleteConfig.shared.validate() +} + +var deleteACLsConfig = aclsCmdConfig{} + +func deleteACLCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "acls [flags]", + Short: "Delete ACLs. Requires providing flags to target ACLs for deletion.", + Args: cobra.NoArgs, + Example: `Delete read acls for topic my-topic, user 'User:default', and host '*' +$ topicctl delete acls --resource-type topic --resource-pattern-type literal --resource-name my-topic --principal 'User:default' --host '*' --operation read --permission-type allow +`, + RunE: func(cmd *cobra.Command, args []string) error { + ctx := context.Background() + sess := session.Must(session.NewSession()) + + adminClient, err := deleteConfig.shared.getAdminClient(ctx, sess, deleteConfig.dryRun) + if err != nil { + return err + } + defer adminClient.Close() + + cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner) + + filter := kafka.DeleteACLsFilter{ + ResourceTypeFilter: kafka.ResourceType(deleteACLsConfig.resourceType), + ResourceNameFilter: deleteACLsConfig.resourceNameFilter, + ResourcePatternTypeFilter: kafka.PatternType(deleteACLsConfig.resourcePatternType), + PrincipalFilter: deleteACLsConfig.principalFilter, + HostFilter: deleteACLsConfig.hostFilter, + Operation: kafka.ACLOperationType(deleteACLsConfig.operationType), + PermissionType: kafka.ACLPermissionType(deleteACLsConfig.permissionType), + } + + aclAdminConfig := acl.ACLAdminConfig{ + // Omit fields we don't need for deletes + DryRun: deleteConfig.dryRun, + // Deletes cannot be skipped + SkipConfirm: false, + } + + return cliRunner.DeleteACL(ctx, aclAdminConfig, filter) + }, + } + cmd.Flags().StringVar( + &deleteACLsConfig.hostFilter, + "host", + "", + `The host to filter on. (e.g. 198.51.100.0) (Required)`, + ) + cmd.MarkFlagRequired("host") + + cmd.Flags().Var( + &deleteACLsConfig.operationType, + "operation", + `The operation that is being allowed or denied to filter on. allowed: [any, all, read, write, create, delete, alter, describe, clusteraction, describeconfigs, alterconfigs, idempotentwrite] (Required)`, + ) + cmd.MarkFlagRequired("operation") + + cmd.Flags().Var( + &deleteACLsConfig.permissionType, + "permission-type", + `The permission type to filter on. allowed: [any, allow, deny] (Required)`, + ) + cmd.MarkFlagRequired("permission-type") + + cmd.Flags().StringVar( + &deleteACLsConfig.principalFilter, + "principal", + "", + `The principal to filter on in principalType:name format (e.g. User:alice). (Required)`, + ) + cmd.MarkFlagRequired("principal") + + cmd.Flags().StringVar( + &deleteACLsConfig.resourceNameFilter, + "resource-name", + "", + `The resource name to filter on. (e.g. my-topic) (Required)`, + ) + cmd.MarkFlagRequired("resource-name") + + cmd.Flags().Var( + &deleteACLsConfig.resourcePatternType, + "resource-pattern-type", + `The type of the resource pattern or filter. allowed: [any, match, literal, prefixed]. "any" will match any pattern type (literal or prefixed), but will match the resource name exactly, where as "match" will perform pattern matching to list all acls that affect the supplied resource(s).`, + ) + + cmd.Flags().Var( + &deleteACLsConfig.resourceType, + "resource-type", + `The type of resource to filter on. allowed: [any, topic, group, cluster, transactionalid, delegationtoken] (Required)`, + ) + cmd.MarkFlagRequired("resource-type") + return cmd +} diff --git a/pkg/acl/acl.go b/pkg/acl/acl.go new file mode 100644 index 00000000..48f02d76 --- /dev/null +++ b/pkg/acl/acl.go @@ -0,0 +1,234 @@ +package acl + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + + "github.com/segmentio/kafka-go" + "github.com/segmentio/topicctl/pkg/admin" + "github.com/segmentio/topicctl/pkg/config" + "github.com/segmentio/topicctl/pkg/util" + log "github.com/sirupsen/logrus" +) + +// ACLCreatorConfig contains the configuration for an ACL admin. +type ACLAdminConfig struct { + ClusterConfig config.ClusterConfig + DryRun bool + SkipConfirm bool + ACLConfig config.ACLConfig +} + +// ACLAdmin executes operations on ACLs by comparing the current ACLs with the desired ACLs. +type ACLAdmin struct { + config ACLAdminConfig + adminClient admin.Client + + clusterConfig config.ClusterConfig + aclConfig config.ACLConfig +} + +func NewACLAdmin( + ctx context.Context, + adminClient admin.Client, + aclAdminConfig ACLAdminConfig, +) (*ACLAdmin, error) { + if !adminClient.GetSupportedFeatures().ACLs { + return nil, fmt.Errorf("ACLs are not supported by this cluster") + } + + return &ACLAdmin{ + config: aclAdminConfig, + adminClient: adminClient, + clusterConfig: aclAdminConfig.ClusterConfig, + aclConfig: aclAdminConfig.ACLConfig, + }, nil +} + +// Create creates ACLs that do not already exist based on the ACL config. +func (a *ACLAdmin) Create(ctx context.Context) error { + log.Info("Validating configs...") + + if err := a.clusterConfig.Validate(); err != nil { + return err + } + + if err := a.aclConfig.Validate(); err != nil { + return err + } + + if err := config.CheckConsistency(a.aclConfig.Meta, a.clusterConfig); err != nil { + return err + } + + log.Info("Checking if ACLs already exists...") + + acls := a.aclConfig.ToNewACLEntries() + + allExistingACLs := []kafka.ACLEntry{} + newACLs := []kafka.ACLEntry{} + + for _, acl := range acls { + existingACLs, err := a.adminClient.GetACLs(ctx, kafka.ACLFilter{ + ResourceTypeFilter: acl.ResourceType, + ResourceNameFilter: acl.ResourceName, + ResourcePatternTypeFilter: acl.ResourcePatternType, + PrincipalFilter: acl.Principal, + HostFilter: acl.Host, + Operation: acl.Operation, + PermissionType: acl.PermissionType, + }) + if err != nil { + return fmt.Errorf("error checking for existing ACL (%v): %v", acl, err) + } + if len(existingACLs) > 0 { + allExistingACLs = append(allExistingACLs, acl) + } else { + newACLs = append(newACLs, acl) + } + } + + if len(allExistingACLs) > 0 { + log.Infof( + "Found %d existing ACLs:\n%s", + len(allExistingACLs), + formatNewACLsConfig(allExistingACLs), + ) + } + + if len(newACLs) == 0 { + log.Infof("No ACLs to create") + return nil + } + + if a.config.DryRun { + log.Infof( + "Would create ACLs with config %+v", + formatNewACLsConfig(newACLs), + ) + return nil + } + + log.Infof( + "It looks like these ACLs don't already exist. Will create them with this config:\n%s", + formatNewACLsConfig(newACLs), + ) + + ok, _ := util.Confirm("OK to continue?", a.config.SkipConfirm) + if !ok { + return errors.New("Stopping because of user response") + } + + log.Infof("Creating new ACLs for user with config %+v", formatNewACLsConfig(newACLs)) + + if err := a.adminClient.CreateACLs(ctx, acls); err != nil { + return fmt.Errorf("error creating new ACLs: %v", err) + } + + return nil +} + +// formatNewACLsConfig generates a pretty string representation of kafka-go +// ACL configurations. +func formatNewACLsConfig(config []kafka.ACLEntry) string { + content, err := json.MarshalIndent(config, "", " ") + if err != nil { + log.Warnf("Error marshalling ACLs config: %+v", err) + return "Error" + } + + return string(content) +} + +// Delete checks if ACLs exist and deletes them if they do. +func (a *ACLAdmin) Delete(ctx context.Context, filter kafka.DeleteACLsFilter) error { + log.Infof("Checking if ACLs exists for filter:\n%+v", formatACLs(filter)) + + getFilter := kafka.ACLFilter{ + ResourceTypeFilter: filter.ResourceTypeFilter, + ResourceNameFilter: filter.ResourceNameFilter, + ResourcePatternTypeFilter: filter.ResourcePatternTypeFilter, + PrincipalFilter: filter.PrincipalFilter, + HostFilter: filter.HostFilter, + Operation: filter.Operation, + PermissionType: filter.PermissionType, + } + clusterACLs, err := a.adminClient.GetACLs(ctx, getFilter) + + if err != nil { + return fmt.Errorf("Error fetching ACL info: \n%+v", err) + } + + if len(clusterACLs) == 0 { + return fmt.Errorf("No ACL matches filter:\n%+v", formatACLs(filter)) + } + + log.Infof("The following ACLs in the cluster are planned for deletion:\n%+v", formatACLInfos(clusterACLs)) + + if a.config.DryRun { + log.Infof("Would delete ACLs:\n%+v", formatACLInfos(clusterACLs)) + return nil + } + + // This isn't settable by the CLI for safety measures but allows for testability + confirm, err := util.Confirm("Delete ACLs?", a.config.SkipConfirm) + if err != nil { + return err + } + + if !confirm { + return errors.New("Stopping because of user response") + } + + resp, err := a.adminClient.DeleteACLs(ctx, []kafka.DeleteACLsFilter{filter}) + + if err != nil { + return err + } + + var respErrors = []error{} + var deletedACLs = []kafka.DeleteACLsMatchingACLs{} + + for _, result := range resp.Results { + if result.Error != nil { + respErrors = append(respErrors, result.Error) + } + for _, matchingACL := range result.MatchingACLs { + if matchingACL.Error != nil { + respErrors = append(respErrors, result.Error) + } + deletedACLs = append(deletedACLs, matchingACL) + } + } + + if len(respErrors) > 0 { + return fmt.Errorf("Got errors while deleting ACLs: \n%+v", respErrors) + } + + log.Infof("ACLs successfully deleted: %+v", formatACLs(deletedACLs)) + + return nil +} + +func formatACLs(acls interface{}) string { + content, err := json.MarshalIndent(acls, "", " ") + if err != nil { + log.Warnf("Error marshalling acls: %+v", err) + return "Error" + } + + return string(content) +} + +func formatACLInfos(acls []admin.ACLInfo) string { + aclsString := []string{} + + for _, acl := range acls { + aclsString = append(aclsString, admin.FormatACLInfo(acl)) + } + + return strings.Join(aclsString, "\n") +} diff --git a/pkg/acl/acl_test.go b/pkg/acl/acl_test.go new file mode 100644 index 00000000..b377146f --- /dev/null +++ b/pkg/acl/acl_test.go @@ -0,0 +1,691 @@ +package acl + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/segmentio/kafka-go" + "github.com/segmentio/topicctl/pkg/admin" + "github.com/segmentio/topicctl/pkg/config" + "github.com/segmentio/topicctl/pkg/util" + "github.com/stretchr/testify/require" +) + +func TestCreateNewACLs(t *testing.T) { + if !util.CanTestBrokerAdminSecurity() { + t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN_SECURITY is not set") + } + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + principal := util.RandomString("User:acl-create-", 6) + topicName := util.RandomString("acl-create-", 6) + + aclConfig := config.ACLConfig{ + Meta: config.ResourceMeta{ + Name: "test-acl", + Cluster: "test-cluster", + Region: "test-region", + Environment: "test-environment", + }, + Spec: config.ACLSpec{ + ACLs: []config.ACL{ + { + Resource: config.ACLResource{ + Type: kafka.ResourceTypeTopic, + Name: topicName, + PatternType: kafka.PatternTypeLiteral, + Principal: principal, + Host: "*", + Permission: kafka.ACLPermissionTypeAllow, + }, + Operations: []kafka.ACLOperationType{ + kafka.ACLOperationTypeRead, + }, + }, + }, + }, + } + aclAdmin := testACLAdmin(ctx, t, aclConfig) + defer aclAdmin.adminClient.Close() + + defer func() { + _, err := aclAdmin.adminClient.GetConnector().KafkaClient.DeleteACLs(ctx, + &kafka.DeleteACLsRequest{ + Filters: []kafka.DeleteACLsFilter{ + { + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }, + }, + }, + ) + + if err != nil { + t.Fatal(fmt.Errorf("failed to clean up ACL, err: %v", err)) + } + }() + err := aclAdmin.Create(ctx) + require.NoError(t, err) + acl, err := aclAdmin.adminClient.GetACLs(ctx, kafka.ACLFilter{ + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }) + require.NoError(t, err) + require.Equal(t, []admin.ACLInfo{ + { + ResourceType: admin.ResourceType(kafka.ResourceTypeTopic), + ResourceName: topicName, + PatternType: admin.PatternType(kafka.PatternTypeLiteral), + Principal: principal, + Host: "*", + Operation: admin.ACLOperationType(kafka.ACLOperationTypeRead), + PermissionType: admin.ACLPermissionType(kafka.ACLPermissionTypeAllow), + }, + }, acl) +} + +func TestCreateExistingACLs(t *testing.T) { + if !util.CanTestBrokerAdminSecurity() { + t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN_SECURITY is not set") + } + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + principal := util.RandomString("User:acl-create-", 6) + topicName := util.RandomString("acl-create-", 6) + + aclConfig := config.ACLConfig{ + Meta: config.ResourceMeta{ + Name: "test-acl", + Cluster: "test-cluster", + Region: "test-region", + Environment: "test-environment", + }, + Spec: config.ACLSpec{ + ACLs: []config.ACL{ + { + Resource: config.ACLResource{ + Type: kafka.ResourceTypeTopic, + Name: topicName, + PatternType: kafka.PatternTypeLiteral, + Principal: principal, + Host: "*", + Permission: kafka.ACLPermissionTypeAllow, + }, + Operations: []kafka.ACLOperationType{ + kafka.ACLOperationTypeRead, + }, + }, + }, + }, + } + aclAdmin := testACLAdmin(ctx, t, aclConfig) + defer aclAdmin.adminClient.Close() + + defer func() { + _, err := aclAdmin.adminClient.GetConnector().KafkaClient.DeleteACLs(ctx, + &kafka.DeleteACLsRequest{ + Filters: []kafka.DeleteACLsFilter{ + { + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }, + }, + }, + ) + + if err != nil { + t.Fatal(fmt.Errorf("failed to clean up ACL, err: %v", err)) + } + }() + err := aclAdmin.Create(ctx) + require.NoError(t, err) + acl, err := aclAdmin.adminClient.GetACLs(ctx, kafka.ACLFilter{ + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }) + require.NoError(t, err) + require.Equal(t, []admin.ACLInfo{ + { + ResourceType: admin.ResourceType(kafka.ResourceTypeTopic), + ResourceName: topicName, + PatternType: admin.PatternType(kafka.PatternTypeLiteral), + Principal: principal, + Host: "*", + Operation: admin.ACLOperationType(kafka.ACLOperationTypeRead), + PermissionType: admin.ACLPermissionType(kafka.ACLPermissionTypeAllow), + }, + }, acl) + // Run create again and make sure it is idempotent + err = aclAdmin.Create(ctx) + require.NoError(t, err) + acl, err = aclAdmin.adminClient.GetACLs(ctx, kafka.ACLFilter{ + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }) + require.NoError(t, err) + require.Equal(t, []admin.ACLInfo{ + { + ResourceType: admin.ResourceType(kafka.ResourceTypeTopic), + ResourceName: topicName, + PatternType: admin.PatternType(kafka.PatternTypeLiteral), + Principal: principal, + Host: "*", + Operation: admin.ACLOperationType(kafka.ACLOperationTypeRead), + PermissionType: admin.ACLPermissionType(kafka.ACLPermissionTypeAllow), + }, + }, acl) +} + +func TestCreateACLsDryRun(t *testing.T) { + if !util.CanTestBrokerAdminSecurity() { + t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN_SECURITY is not set") + } + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + principal := util.RandomString("User:acl-create-", 6) + topicName := util.RandomString("acl-create-", 6) + + aclConfig := config.ACLConfig{ + Meta: config.ResourceMeta{ + Name: "test-acl", + Cluster: "test-cluster", + Region: "test-region", + Environment: "test-environment", + }, + Spec: config.ACLSpec{ + ACLs: []config.ACL{ + { + Resource: config.ACLResource{ + Type: kafka.ResourceTypeTopic, + Name: topicName, + PatternType: kafka.PatternTypeLiteral, + Principal: principal, + Host: "*", + Permission: kafka.ACLPermissionTypeAllow, + }, + Operations: []kafka.ACLOperationType{ + kafka.ACLOperationTypeRead, + }, + }, + }, + }, + } + aclAdmin := testACLAdmin(ctx, t, aclConfig) + defer aclAdmin.adminClient.Close() + aclAdmin.config.DryRun = true + + err := aclAdmin.Create(ctx) + require.NoError(t, err) + acl, err := aclAdmin.adminClient.GetACLs(ctx, kafka.ACLFilter{ + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }) + require.NoError(t, err) + require.Equal(t, []admin.ACLInfo{}, acl) +} + +func TestDeleteACLs(t *testing.T) { + if !util.CanTestBrokerAdminSecurity() { + t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN_SECURITY is not set") + } + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + principal := util.RandomString("User:acl-delete-", 6) + topicName := util.RandomString("acl-delete-", 6) + + aclConfig := config.ACLConfig{ + Meta: config.ResourceMeta{ + Name: "test-acl", + Cluster: "test-cluster", + Region: "test-region", + Environment: "test-environment", + }, + Spec: config.ACLSpec{ + ACLs: []config.ACL{ + { + Resource: config.ACLResource{ + Type: kafka.ResourceTypeTopic, + Name: topicName, + PatternType: kafka.PatternTypeLiteral, + Principal: principal, + Host: "*", + Permission: kafka.ACLPermissionTypeAllow, + }, + Operations: []kafka.ACLOperationType{ + kafka.ACLOperationTypeRead, + }, + }, + }, + }, + } + aclAdmin := testACLAdmin(ctx, t, aclConfig) + defer aclAdmin.adminClient.Close() + + err := aclAdmin.Create(ctx) + require.NoError(t, err) + acl, err := aclAdmin.adminClient.GetACLs(ctx, kafka.ACLFilter{ + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }) + require.NoError(t, err) + require.Equal(t, []admin.ACLInfo{ + { + ResourceType: admin.ResourceType(kafka.ResourceTypeTopic), + ResourceName: topicName, + PatternType: admin.PatternType(kafka.PatternTypeLiteral), + Principal: principal, + Host: "*", + Operation: admin.ACLOperationType(kafka.ACLOperationTypeRead), + PermissionType: admin.ACLPermissionType(kafka.ACLPermissionTypeAllow), + }, + }, acl) + + err = aclAdmin.Delete(ctx, kafka.DeleteACLsFilter{ + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }) + require.NoError(t, err) + acl, err = aclAdmin.adminClient.GetACLs(ctx, kafka.ACLFilter{ + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }) + require.NoError(t, err) + require.Equal(t, []admin.ACLInfo{}, acl) +} + +func TestDeleteMultipleACLs(t *testing.T) { + if !util.CanTestBrokerAdminSecurity() { + t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN_SECURITY is not set") + } + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + principal := util.RandomString("User:acl-delete-multi-", 6) + topicName := util.RandomString("acl-delete-multi-", 6) + + // Create 3 ACLs, two for topics and one for groups + aclConfig := config.ACLConfig{ + Meta: config.ResourceMeta{ + Name: "test-acl", + Cluster: "test-cluster", + Region: "test-region", + Environment: "test-environment", + }, + Spec: config.ACLSpec{ + ACLs: []config.ACL{ + { + Resource: config.ACLResource{ + Type: kafka.ResourceTypeTopic, + Name: topicName, + PatternType: kafka.PatternTypeLiteral, + Principal: principal, + Host: "*", + Permission: kafka.ACLPermissionTypeAllow, + }, + Operations: []kafka.ACLOperationType{ + kafka.ACLOperationTypeRead, + }, + }, + { + Resource: config.ACLResource{ + Type: kafka.ResourceTypeTopic, + Name: topicName, + PatternType: kafka.PatternTypeLiteral, + Principal: principal, + Host: "*", + Permission: kafka.ACLPermissionTypeDeny, + }, + Operations: []kafka.ACLOperationType{ + kafka.ACLOperationTypeRead, + }, + }, + { + Resource: config.ACLResource{ + Type: kafka.ResourceTypeGroup, + Name: topicName, + PatternType: kafka.PatternTypeLiteral, + Principal: principal, + Host: "*", + Permission: kafka.ACLPermissionTypeAllow, + }, + Operations: []kafka.ACLOperationType{ + kafka.ACLOperationTypeRead, + }, + }, + }, + }, + } + aclAdmin := testACLAdmin(ctx, t, aclConfig) + defer aclAdmin.adminClient.Close() + + err := aclAdmin.Create(ctx) + require.NoError(t, err) + + defer func() { + _, err := aclAdmin.adminClient.GetConnector().KafkaClient.DeleteACLs(ctx, + &kafka.DeleteACLsRequest{ + Filters: []kafka.DeleteACLsFilter{ + { + ResourceTypeFilter: kafka.ResourceTypeGroup, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }, + }, + }, + ) + + if err != nil { + t.Fatal(fmt.Errorf("failed to clean up ACL, err: %v", err)) + } + }() + acl, err := aclAdmin.adminClient.GetACLs(ctx, kafka.ACLFilter{ + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAny, + Operation: kafka.ACLOperationTypeRead, + }) + require.NoError(t, err) + require.ElementsMatch(t, []admin.ACLInfo{ + { + ResourceType: admin.ResourceType(kafka.ResourceTypeTopic), + ResourceName: topicName, + PatternType: admin.PatternType(kafka.PatternTypeLiteral), + Principal: principal, + Host: "*", + Operation: admin.ACLOperationType(kafka.ACLOperationTypeRead), + PermissionType: admin.ACLPermissionType(kafka.ACLPermissionTypeAllow), + }, + { + ResourceType: admin.ResourceType(kafka.ResourceTypeTopic), + ResourceName: topicName, + PatternType: admin.PatternType(kafka.PatternTypeLiteral), + Principal: principal, + Host: "*", + Operation: admin.ACLOperationType(kafka.ACLOperationTypeRead), + PermissionType: admin.ACLPermissionType(kafka.ACLPermissionTypeDeny), + }, + }, acl) + + acl, err = aclAdmin.adminClient.GetACLs(ctx, kafka.ACLFilter{ + ResourceTypeFilter: kafka.ResourceTypeGroup, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }) + require.NoError(t, err) + require.Equal(t, []admin.ACLInfo{ + { + ResourceType: admin.ResourceType(kafka.ResourceTypeGroup), + ResourceName: topicName, + PatternType: admin.PatternType(kafka.PatternTypeLiteral), + Principal: principal, + Host: "*", + Operation: admin.ACLOperationType(kafka.ACLOperationTypeRead), + PermissionType: admin.ACLPermissionType(kafka.ACLPermissionTypeAllow), + }, + }, acl) + + // Delete the two topic ACLs + err = aclAdmin.Delete(ctx, kafka.DeleteACLsFilter{ + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeAny, + }) + require.NoError(t, err) + acl, err = aclAdmin.adminClient.GetACLs(ctx, kafka.ACLFilter{ + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeAny, + }) + require.NoError(t, err) + require.Equal(t, []admin.ACLInfo{}, acl) + + // Verify the group ACL remains + acl, err = aclAdmin.adminClient.GetACLs(ctx, kafka.ACLFilter{ + ResourceTypeFilter: kafka.ResourceTypeGroup, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }) + require.NoError(t, err) + require.Equal(t, []admin.ACLInfo{ + { + ResourceType: admin.ResourceType(kafka.ResourceTypeGroup), + ResourceName: topicName, + PatternType: admin.PatternType(kafka.PatternTypeLiteral), + Principal: principal, + Host: "*", + Operation: admin.ACLOperationType(kafka.ACLOperationTypeRead), + PermissionType: admin.ACLPermissionType(kafka.ACLPermissionTypeAllow), + }, + }, acl) +} + +func TestDeleteACLDoesNotExist(t *testing.T) { + if !util.CanTestBrokerAdminSecurity() { + t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN_SECURITY is not set") + } + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + principal := util.RandomString("User:acl-delete-", 6) + topicName := util.RandomString("acl-delete-", 6) + + aclConfig := config.ACLConfig{ + Meta: config.ResourceMeta{ + Name: "test-acl", + Cluster: "test-cluster", + Region: "test-region", + Environment: "test-environment", + }, + Spec: config.ACLSpec{ + ACLs: []config.ACL{ + { + Resource: config.ACLResource{ + Type: kafka.ResourceTypeTopic, + Name: topicName, + PatternType: kafka.PatternTypeLiteral, + Principal: principal, + Host: "*", + Permission: kafka.ACLPermissionTypeAllow, + }, + Operations: []kafka.ACLOperationType{ + kafka.ACLOperationTypeRead, + }, + }, + }, + }, + } + aclAdmin := testACLAdmin(ctx, t, aclConfig) + defer aclAdmin.adminClient.Close() + + defer func() { + _, err := aclAdmin.adminClient.GetConnector().KafkaClient.DeleteACLs(ctx, + &kafka.DeleteACLsRequest{ + Filters: []kafka.DeleteACLsFilter{ + { + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: "does-not-exist", + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }, + }, + }, + ) + + if err != nil { + t.Fatal(fmt.Errorf("failed to clean up ACL, err: %v", err)) + } + }() + err := aclAdmin.Create(ctx) + require.NoError(t, err) + acl, err := aclAdmin.adminClient.GetACLs(ctx, kafka.ACLFilter{ + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }) + require.NoError(t, err) + require.Equal(t, []admin.ACLInfo{ + { + ResourceType: admin.ResourceType(kafka.ResourceTypeTopic), + ResourceName: topicName, + PatternType: admin.PatternType(kafka.PatternTypeLiteral), + Principal: principal, + Host: "*", + Operation: admin.ACLOperationType(kafka.ACLOperationTypeRead), + PermissionType: admin.ACLPermissionType(kafka.ACLPermissionTypeAllow), + }, + }, acl) + + err = aclAdmin.Delete(ctx, kafka.DeleteACLsFilter{ + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: "does-not-exist", + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }) + require.Error(t, err) + // ACL still exists + acl, err = aclAdmin.adminClient.GetACLs(ctx, kafka.ACLFilter{ + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + PrincipalFilter: principal, + HostFilter: "*", + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + }) + require.NoError(t, err) + require.Equal(t, []admin.ACLInfo{ + { + ResourceType: admin.ResourceType(kafka.ResourceTypeTopic), + ResourceName: topicName, + PatternType: admin.PatternType(kafka.PatternTypeLiteral), + Principal: principal, + Host: "*", + Operation: admin.ACLOperationType(kafka.ACLOperationTypeRead), + PermissionType: admin.ACLPermissionType(kafka.ACLPermissionTypeAllow), + }, + }, acl) +} + +func testACLAdmin( + ctx context.Context, + t *testing.T, + aclConfig config.ACLConfig, +) *ACLAdmin { + clusterConfig := config.ClusterConfig{ + Meta: config.ClusterMeta{ + Name: "test-cluster", + Region: "test-region", + Environment: "test-environment", + }, + Spec: config.ClusterSpec{ + BootstrapAddrs: []string{util.TestKafkaAddr()}, + ZKLockPath: "/topicctl/locks", + }, + } + + adminClient, err := clusterConfig.NewAdminClient(ctx, nil, false, "", "") + require.NoError(t, err) + + aclAdmin, err := NewACLAdmin( + ctx, + adminClient, + ACLAdminConfig{ + ClusterConfig: clusterConfig, + ACLConfig: aclConfig, + DryRun: false, + SkipConfirm: true, + }, + ) + require.NoError(t, err) + return aclAdmin +} diff --git a/pkg/admin/brokerclient.go b/pkg/admin/brokerclient.go index 0d5b2429..5df6a706 100644 --- a/pkg/admin/brokerclient.go +++ b/pkg/admin/brokerclient.go @@ -843,6 +843,28 @@ func (c *BrokerAdminClient) CreateACLs( return nil } +// DeleteACLs deletes ACLs in the cluster. +func (c *BrokerAdminClient) DeleteACLs( + ctx context.Context, + filters []kafka.DeleteACLsFilter, +) (*kafka.DeleteACLsResponse, error) { + if c.config.ReadOnly { + return nil, errors.New("Cannot delete ACL in read-only mode") + } + + req := kafka.DeleteACLsRequest{ + Filters: filters, + } + log.Debugf("DeleteACLs request: %+v", req) + + resp, err := c.client.DeleteACLs(ctx, &req) + log.Debugf("DeleteACLs response: %+v (%+v)", resp, err) + if err != nil { + return nil, err + } + return resp, nil +} + func (c *BrokerAdminClient) GetAllTopicsMetadata( ctx context.Context, ) (*kafka.MetadataResponse, error) { diff --git a/pkg/admin/brokerclient_test.go b/pkg/admin/brokerclient_test.go index 542d225c..763110b6 100644 --- a/pkg/admin/brokerclient_test.go +++ b/pkg/admin/brokerclient_test.go @@ -664,6 +664,102 @@ func TestBrokerClientCreateGetACL(t *testing.T) { assert.Equal(t, expected, aclsInfo) } +func TestBrokerClientDeleteACL(t *testing.T) { + if !util.CanTestBrokerAdminSecurity() { + t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN_SECURITY is not set") + } + + ctx := context.Background() + client, err := NewBrokerAdminClient( + ctx, + BrokerAdminClientConfig{ + ConnectorConfig: ConnectorConfig{ + BrokerAddr: util.TestKafkaAddr(), + }, + }, + ) + require.NoError(t, err) + + principal := util.RandomString("User:user-create-", 6) + topicName := util.RandomString("topic-create-", 6) + + err = client.CreateACLs( + ctx, + []kafka.ACLEntry{ + { + Principal: principal, + PermissionType: kafka.ACLPermissionTypeAllow, + Operation: kafka.ACLOperationTypeRead, + ResourceType: kafka.ResourceTypeTopic, + ResourcePatternType: kafka.PatternTypeLiteral, + ResourceName: topicName, + Host: "*", + }, + }, + ) + require.NoError(t, err) + + filter := kafka.ACLFilter{ + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + Operation: kafka.ACLOperationTypeRead, + PermissionType: kafka.ACLPermissionTypeAllow, + } + + aclsInfo, err := client.GetACLs(ctx, filter) + require.NoError(t, err) + expected := []ACLInfo{ + { + ResourceType: ResourceType(kafka.ResourceTypeTopic), + ResourceName: topicName, + PatternType: PatternType(kafka.PatternTypeLiteral), + Principal: principal, + Host: "*", + Operation: ACLOperationType(kafka.ACLOperationTypeRead), + PermissionType: ACLPermissionType(kafka.ACLPermissionTypeAllow), + }, + } + assert.Equal(t, expected, aclsInfo) + + deleteFilters := []kafka.DeleteACLsFilter{ + { + ResourceTypeFilter: kafka.ResourceTypeTopic, + ResourceNameFilter: topicName, + ResourcePatternTypeFilter: kafka.PatternTypeLiteral, + Operation: kafka.ACLOperationTypeRead, + PermissionType: kafka.ACLPermissionTypeAllow, + }, + } + + resp, err := client.DeleteACLs(ctx, deleteFilters) + require.NoError(t, err) + expectedDeleteResults := []kafka.DeleteACLsResult{ + { + Error: nil, + MatchingACLs: []kafka.DeleteACLsMatchingACLs{ + { + Error: nil, + ResourceType: kafka.ResourceTypeTopic, + ResourceName: topicName, + ResourcePatternType: kafka.PatternTypeLiteral, + Principal: principal, + Host: "*", + Operation: kafka.ACLOperationTypeRead, + PermissionType: kafka.ACLPermissionTypeAllow, + }, + }, + }, + } + + require.NoError(t, err) + assert.Equal(t, expectedDeleteResults, resp.Results) + + aclsInfo, err = client.GetACLs(ctx, filter) + require.NoError(t, err) + assert.Equal(t, []ACLInfo{}, aclsInfo) +} + func TestBrokerClientCreateGetUsers(t *testing.T) { if !util.CanTestBrokerAdminSecurity() { t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN_SECURITY is not set") @@ -772,3 +868,24 @@ func TestBrokerClientCreateACLReadOnly(t *testing.T) { assert.Equal(t, err, errors.New("Cannot create ACL in read-only mode")) } + +func TestBrokerClientDeleteACLReadOnly(t *testing.T) { + if !util.CanTestBrokerAdminSecurity() { + t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN_SECURITY is not set") + } + + ctx := context.Background() + client, err := NewBrokerAdminClient( + ctx, + BrokerAdminClientConfig{ + ConnectorConfig: ConnectorConfig{ + BrokerAddr: util.TestKafkaAddr(), + }, + ReadOnly: true, + }, + ) + require.NoError(t, err) + _, err = client.DeleteACLs(ctx, []kafka.DeleteACLsFilter{}) + + assert.Equal(t, errors.New("Cannot delete ACL in read-only mode"), err) +} diff --git a/pkg/admin/client.go b/pkg/admin/client.go index 93d20e00..964292e4 100644 --- a/pkg/admin/client.go +++ b/pkg/admin/client.go @@ -83,6 +83,12 @@ type Client interface { acls []kafka.ACLEntry, ) error + // DeleteACLs deletes ACLs in the cluster. + DeleteACLs( + ctx context.Context, + filters []kafka.DeleteACLsFilter, + ) (*kafka.DeleteACLsResponse, error) + // UpsertUser creates or updates an user in zookeeper. UpsertUser( ctx context.Context, diff --git a/pkg/admin/types.go b/pkg/admin/types.go index d55a0e85..9d25b85b 100644 --- a/pkg/admin/types.go +++ b/pkg/admin/types.go @@ -1,6 +1,7 @@ package admin import ( + "encoding/json" "errors" "fmt" "reflect" @@ -88,6 +89,36 @@ type ACLInfo struct { PermissionType ACLPermissionType `json:"permissionType"` } +// FormatACLInfo formats an ACLInfo struct as a string, using the +// string version of all the fields. +func FormatACLInfo(a ACLInfo) string { + alias := struct { + ResourceType string + ResourceName string + PatternType string + Principal string + Host string + Operation string + PermissionType string + }{ + ResourceType: a.ResourceType.String(), + ResourceName: a.ResourceName, + PatternType: a.PatternType.String(), + Principal: a.Principal, + Host: a.Host, + Operation: a.Operation.String(), + PermissionType: a.PermissionType.String(), + } + + content, err := json.MarshalIndent(alias, "", " ") + if err != nil { + log.Warnf("Error marshalling acls: %+v", err) + return "Error" + } + + return string(content) +} + // ResourceType presents the Kafka resource type. // We need to subtype this to be able to define methods to // satisfy the Value interface from Cobra so we can use it diff --git a/pkg/admin/zkclient.go b/pkg/admin/zkclient.go index 8f7f4d9d..6d18e881 100644 --- a/pkg/admin/zkclient.go +++ b/pkg/admin/zkclient.go @@ -436,6 +436,13 @@ func (c *ZKAdminClient) CreateACLs( return errors.New("ACLs not yet supported with zk access mode; omit zk addresses to fix.") } +func (c *ZKAdminClient) DeleteACLs( + ctx context.Context, + filters []kafka.DeleteACLsFilter, +) (*kafka.DeleteACLsResponse, error) { + return nil, errors.New("ACLs not yet supported with zk access mode; omit zk addresses to fix.") +} + func (c *ZKAdminClient) GetUsers( ctx context.Context, names []string, diff --git a/pkg/admin/zkclient_test.go b/pkg/admin/zkclient_test.go index 72ec4c10..3301a737 100644 --- a/pkg/admin/zkclient_test.go +++ b/pkg/admin/zkclient_test.go @@ -1107,6 +1107,21 @@ func TestZkCreateACL(t *testing.T) { assert.Equal(t, err, errors.New("ACLs not yet supported with zk access mode; omit zk addresses to fix.")) } +func TestZkDeleteACL(t *testing.T) { + ctx := context.Background() + adminClient, err := NewZKAdminClient( + ctx, + ZKAdminClientConfig{ + ZKAddrs: []string{util.TestZKAddr()}, + }, + ) + require.NoError(t, err) + defer adminClient.Close() + + _, err = adminClient.DeleteACLs(ctx, []kafka.DeleteACLsFilter{}) + assert.Equal(t, err, errors.New("ACLs not yet supported with zk access mode; omit zk addresses to fix.")) +} + func TestZkGetUsers(t *testing.T) { ctx := context.Background() adminClient, err := NewZKAdminClient( diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 898f4f45..783963b8 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -14,11 +14,11 @@ import ( "github.com/briandowns/spinner" "github.com/fatih/color" "github.com/segmentio/kafka-go" + "github.com/segmentio/topicctl/pkg/acl" "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/apply" "github.com/segmentio/topicctl/pkg/check" "github.com/segmentio/topicctl/pkg/config" - "github.com/segmentio/topicctl/pkg/create" "github.com/segmentio/topicctl/pkg/groups" "github.com/segmentio/topicctl/pkg/messages" log "github.com/sirupsen/logrus" @@ -114,13 +114,14 @@ func (c *CLIRunner) ApplyTopic( // CreateACL does an apply run according to the spec in the argument config. func (c *CLIRunner) CreateACL( ctx context.Context, - creatorConfig create.ACLCreatorConfig, + aclAdminConfig acl.ACLAdminConfig, ) error { - creator, err := create.NewACLCreator( + aclAdmin, err := acl.NewACLAdmin( ctx, c.adminClient, - creatorConfig, + aclAdminConfig, ) + if err != nil { return err } @@ -129,12 +130,13 @@ func (c *CLIRunner) CreateACL( c.printer( "Starting creation for ACLs %s in environment %s, cluster %s", - highlighter(creatorConfig.ACLConfig.Meta.Name), - highlighter(creatorConfig.ACLConfig.Meta.Environment), - highlighter(creatorConfig.ACLConfig.Meta.Cluster), + highlighter(aclAdminConfig.ACLConfig.Meta.Name), + highlighter(aclAdminConfig.ACLConfig.Meta.Environment), + highlighter(aclAdminConfig.ACLConfig.Meta.Cluster), ) - err = creator.Create(ctx) + err = aclAdmin.Create(ctx) + if err != nil { return err } @@ -143,6 +145,31 @@ func (c *CLIRunner) CreateACL( return nil } +// DeleteACL deletes a single ACL. +func (c *CLIRunner) DeleteACL( + ctx context.Context, + aclAdminConfig acl.ACLAdminConfig, + filter kafka.DeleteACLsFilter, +) error { + aclAdmin, err := acl.NewACLAdmin( + ctx, + c.adminClient, + aclAdminConfig, + ) + + if err != nil { + return err + } + + err = aclAdmin.Delete(ctx, filter) + if err != nil { + return err + } + + c.printer("Delete completed successfully!") + return nil +} + // BootstrapTopics creates configs for one or more topics based on their current state in the // cluster. func (c *CLIRunner) BootstrapTopics(