From b1deeea1a1c02e9cff8a29bba2c220b6eff70563 Mon Sep 17 00:00:00 2001 From: Pasindu Dissanayake Date: Mon, 29 Jul 2024 14:33:38 +1000 Subject: [PATCH 1/5] add pod overrides affinity and tolerations --- README.adoc | 4 ++++ internal/common-operation.go | 11 +++++++++++ internal/k8s/executor.go | 4 ++++ internal/k8s/pod_overrides.go | 12 +++++++++++- 4 files changed, 30 insertions(+), 1 deletion(-) diff --git a/README.adoc b/README.adoc index c06b7504..273ba0e7 100644 --- a/README.adoc +++ b/README.adoc @@ -144,6 +144,10 @@ contexts: # optional: nodeSelector to add to the pod nodeSelector: key: value + # optional: affinity to add to the pod + affnity: '{"nodeAffinity": "requiredDuringSchedulingIgnoredDuringExecution": {nodeSelectorTerms: [{"matchExpressions":[{"key":"", "operator":"", "values":[""]}]}]}}' + # optional: tolerations to add to the pod + tolerations: '[{"effect":"","key":"","operator":"","value":""}]' # optional: clientID config (defaults to kafkactl-{username}) clientID: my-client-id diff --git a/internal/common-operation.go b/internal/common-operation.go index 9d264f9b..82efa8ed 100644 --- a/internal/common-operation.go +++ b/internal/common-operation.go @@ -3,6 +3,7 @@ package internal import ( "crypto/tls" "crypto/x509" + "encoding/json" "fmt" "net/http" "os" @@ -74,6 +75,8 @@ type K8sConfig struct { Labels map[string]string Annotations map[string]string NodeSelector map[string]string + Affinity map[string]any + Tolerations []map[string]any } type ConsumerConfig struct { @@ -174,6 +177,14 @@ func CreateClientContext() (ClientContext, error) { context.Kubernetes.Labels = viper.GetStringMapString("contexts." + context.Name + ".kubernetes.labels") context.Kubernetes.Annotations = viper.GetStringMapString("contexts." + context.Name + ".kubernetes.annotations") context.Kubernetes.NodeSelector = viper.GetStringMapString("contexts." + context.Name + ".kubernetes.nodeSelector") + context.Kubernetes.Affinity = viper.GetStringMap("contexts." + context.Name + ".kubernetes.affinity") + + var tolerations []map[string]any + err := json.Unmarshal([]byte(viper.GetString("contexts."+context.Name+".kubernetes.tolerations")), &tolerations) + if err != nil { + return context, err + } + context.Kubernetes.Tolerations = tolerations return context, nil } diff --git a/internal/k8s/executor.go b/internal/k8s/executor.go index 87aef8f1..5e494909 100644 --- a/internal/k8s/executor.go +++ b/internal/k8s/executor.go @@ -37,6 +37,8 @@ type executor struct { labels map[string]string annotations map[string]string nodeSelector map[string]string + affinity map[string]any + tolerations []map[string]any } const letterBytes = "abcdefghijklmnpqrstuvwxyz123456789" @@ -111,6 +113,8 @@ func newExecutor(context internal.ClientContext, runner Runner) *executor { labels: context.Kubernetes.Labels, annotations: context.Kubernetes.Annotations, nodeSelector: context.Kubernetes.NodeSelector, + affinity: context.Kubernetes.Affinity, + tolerations: context.Kubernetes.Tolerations, runner: runner, } } diff --git a/internal/k8s/pod_overrides.go b/internal/k8s/pod_overrides.go index 425b30e2..1ecce656 100644 --- a/internal/k8s/pod_overrides.go +++ b/internal/k8s/pod_overrides.go @@ -13,6 +13,8 @@ type specType struct { ImagePullSecrets []imagePullSecretType `json:"imagePullSecrets,omitempty"` ServiceAccountName *string `json:"serviceAccountName,omitempty"` NodeSelector *map[string]string `json:"nodeSelector,omitempty"` + Affinity *map[string]any `json:"affinity,omitempty"` + Tolerations *[]map[string]any `json:"tolerations,omitempty"` } type PodOverrideType struct { @@ -29,7 +31,7 @@ func (kubectl *executor) createPodOverride() PodOverrideType { var override PodOverrideType override.APIVersion = "v1" - if kubectl.serviceAccount != "" || kubectl.imagePullSecret != "" || len(kubectl.nodeSelector) > 0 { + if kubectl.serviceAccount != "" || kubectl.imagePullSecret != "" || len(kubectl.nodeSelector) > 0 || len(kubectl.affinity) > 0 || len(kubectl.tolerations) > 0 { override.Spec = &specType{} if kubectl.serviceAccount != "" { @@ -44,6 +46,14 @@ func (kubectl *executor) createPodOverride() PodOverrideType { if len(kubectl.nodeSelector) > 0 { override.Spec.NodeSelector = &kubectl.nodeSelector } + + if len(kubectl.affinity) > 0 { + override.Spec.Affinity = &kubectl.affinity + } + + if len(kubectl.tolerations) > 0 { + override.Spec.Tolerations = &kubectl.tolerations + } } if len(kubectl.labels) > 0 || len(kubectl.annotations) > 0 { From c95366da95358abf162e1a9ec106e4ce75965937 Mon Sep 17 00:00:00 2001 From: Pasindu Dissanayake Date: Thu, 1 Aug 2024 15:07:50 +1000 Subject: [PATCH 2/5] add affinity and tolerations pod override change to unlreased changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 44357fa7..28dde215 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +- [#203](https://github.com/deviceinsight/kafkactl/pull/203) Add pod override fields affinity and tolerations ## 5.2.0 - 2024-08-08 From 6775b5e9729dd4ef5856a6484fadcc0e16de4aa3 Mon Sep 17 00:00:00 2001 From: Pasindu Dissanayake Date: Mon, 5 Aug 2024 11:29:29 +1000 Subject: [PATCH 3/5] fix parsing logic for tolerations field --- internal/common-operation.go | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/internal/common-operation.go b/internal/common-operation.go index 82efa8ed..ad3233e4 100644 --- a/internal/common-operation.go +++ b/internal/common-operation.go @@ -178,17 +178,29 @@ func CreateClientContext() (ClientContext, error) { context.Kubernetes.Annotations = viper.GetStringMapString("contexts." + context.Name + ".kubernetes.annotations") context.Kubernetes.NodeSelector = viper.GetStringMapString("contexts." + context.Name + ".kubernetes.nodeSelector") context.Kubernetes.Affinity = viper.GetStringMap("contexts." + context.Name + ".kubernetes.affinity") - - var tolerations []map[string]any - err := json.Unmarshal([]byte(viper.GetString("contexts."+context.Name+".kubernetes.tolerations")), &tolerations) - if err != nil { - return context, err - } - context.Kubernetes.Tolerations = tolerations - + + t, err := convertJsonToListMap("tolerations", viper.GetString("contexts."+context.Name+".kubernetes.tolerations")) + context.Kubernetes.Tolerations = t + if err != nil { + return context, err + } return context, nil } +func convertJsonToListMap(fieldName string, jsonStr string) ([]map[string]any, error){ + var listMap []map[string]any + if (jsonStr == "") { + return listMap, nil + } + err := json.Unmarshal([]byte(jsonStr), &listMap) + if err != nil { + fmt.Errorf("Error parsing %s field", fieldName) + return listMap, err + } + return listMap, nil +} + + func CreateClient(context *ClientContext) (sarama.Client, error) { config, err := CreateClientConfig(context) if err == nil { From b8037a5044da4872d491da998c51ae50b39fbe8e Mon Sep 17 00:00:00 2001 From: Dirk Wilden Date: Wed, 14 Aug 2024 11:55:22 +0200 Subject: [PATCH 4/5] change toleration config to be in yaml format --- README.adoc | 21 ++++++++++++++++++--- internal/common-operation.go | 35 +++++++++++++---------------------- internal/k8s/executor.go | 2 +- internal/k8s/pod_overrides.go | 12 +++++++----- 4 files changed, 39 insertions(+), 31 deletions(-) diff --git a/README.adoc b/README.adoc index 273ba0e7..531a30f0 100644 --- a/README.adoc +++ b/README.adoc @@ -144,10 +144,25 @@ contexts: # optional: nodeSelector to add to the pod nodeSelector: key: value + # optional: affinity to add to the pod - affnity: '{"nodeAffinity": "requiredDuringSchedulingIgnoredDuringExecution": {nodeSelectorTerms: [{"matchExpressions":[{"key":"", "operator":"", "values":[""]}]}]}}' + affinity: + # note: other types of affinity also supported + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: "" + operator: "" + values: [ "" ] + # optional: tolerations to add to the pod - tolerations: '[{"effect":"","key":"","operator":"","value":""}]' + tolerations: + - key: "" + operator: "" + value: "" + effect: "" + # optional: clientID config (defaults to kafkactl-{username}) clientID: my-client-id @@ -628,7 +643,7 @@ Producing protobuf message converted from JSON: kafkactl produce my-topic --key='{"keyField":123}' --key-proto-type MyKeyMessage --value='{"valueField":"value"}' --value-proto-type MyValueMessage --proto-file kafkamsg.proto ---- -A more complex protobuf message converted from a multi-line JSON string can be produced using a file input with custom separators. +A more complex protobuf message converted from a multi-line JSON string can be produced using a file input with custom separators. For example, if you have the following protobuf definition (`complex.proto`): diff --git a/internal/common-operation.go b/internal/common-operation.go index ad3233e4..b63ceda5 100644 --- a/internal/common-operation.go +++ b/internal/common-operation.go @@ -3,7 +3,6 @@ package internal import ( "crypto/tls" "crypto/x509" - "encoding/json" "fmt" "net/http" "os" @@ -62,6 +61,13 @@ type TLSConfig struct { Insecure bool } +type K8sToleration struct { + Key string `json:"key" yaml:"key"` + Operator string `json:"operator" yaml:"operator"` + Value string `json:"value" yaml:"value"` + Effect string `json:"effect" yaml:"effect"` +} + type K8sConfig struct { Enabled bool Binary string @@ -76,7 +82,7 @@ type K8sConfig struct { Annotations map[string]string NodeSelector map[string]string Affinity map[string]any - Tolerations []map[string]any + Tolerations []K8sToleration } type ConsumerConfig struct { @@ -178,28 +184,13 @@ func CreateClientContext() (ClientContext, error) { context.Kubernetes.Annotations = viper.GetStringMapString("contexts." + context.Name + ".kubernetes.annotations") context.Kubernetes.NodeSelector = viper.GetStringMapString("contexts." + context.Name + ".kubernetes.nodeSelector") context.Kubernetes.Affinity = viper.GetStringMap("contexts." + context.Name + ".kubernetes.affinity") - - t, err := convertJsonToListMap("tolerations", viper.GetString("contexts."+context.Name+".kubernetes.tolerations")) - context.Kubernetes.Tolerations = t - if err != nil { - return context, err - } - return context, nil -} -func convertJsonToListMap(fieldName string, jsonStr string) ([]map[string]any, error){ - var listMap []map[string]any - if (jsonStr == "") { - return listMap, nil - } - err := json.Unmarshal([]byte(jsonStr), &listMap) - if err != nil { - fmt.Errorf("Error parsing %s field", fieldName) - return listMap, err - } - return listMap, nil -} + if err := viper.UnmarshalKey("contexts."+context.Name+".kubernetes.tolerations", &context.Kubernetes.Tolerations); err != nil { + return context, err + } + return context, nil +} func CreateClient(context *ClientContext) (sarama.Client, error) { config, err := CreateClientConfig(context) diff --git a/internal/k8s/executor.go b/internal/k8s/executor.go index 5e494909..657bf158 100644 --- a/internal/k8s/executor.go +++ b/internal/k8s/executor.go @@ -38,7 +38,7 @@ type executor struct { annotations map[string]string nodeSelector map[string]string affinity map[string]any - tolerations []map[string]any + tolerations []internal.K8sToleration } const letterBytes = "abcdefghijklmnpqrstuvwxyz123456789" diff --git a/internal/k8s/pod_overrides.go b/internal/k8s/pod_overrides.go index 1ecce656..c56f92dc 100644 --- a/internal/k8s/pod_overrides.go +++ b/internal/k8s/pod_overrides.go @@ -1,5 +1,7 @@ package k8s +import "github.com/deviceinsight/kafkactl/v5/internal" + type imagePullSecretType struct { Name string `json:"name"` } @@ -10,11 +12,11 @@ type metadataType struct { } type specType struct { - ImagePullSecrets []imagePullSecretType `json:"imagePullSecrets,omitempty"` - ServiceAccountName *string `json:"serviceAccountName,omitempty"` - NodeSelector *map[string]string `json:"nodeSelector,omitempty"` - Affinity *map[string]any `json:"affinity,omitempty"` - Tolerations *[]map[string]any `json:"tolerations,omitempty"` + ImagePullSecrets []imagePullSecretType `json:"imagePullSecrets,omitempty"` + ServiceAccountName *string `json:"serviceAccountName,omitempty"` + NodeSelector *map[string]string `json:"nodeSelector,omitempty"` + Affinity *map[string]any `json:"affinity,omitempty"` + Tolerations *[]internal.K8sToleration `json:"tolerations,omitempty"` } type PodOverrideType struct { From 263498e43054296124b5599b923d7b6758c7ebbc Mon Sep 17 00:00:00 2001 From: Dirk Wilden Date: Wed, 14 Aug 2024 12:02:01 +0200 Subject: [PATCH 5/5] fix linter error with new golancilint version --- .../consumergroupoffsets/consumer-group-offset-operation.go | 2 +- internal/producer/producer-operation.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/consumergroupoffsets/consumer-group-offset-operation.go b/internal/consumergroupoffsets/consumer-group-offset-operation.go index 749bfac2..94bfff50 100644 --- a/internal/consumergroupoffsets/consumer-group-offset-operation.go +++ b/internal/consumergroupoffsets/consumer-group-offset-operation.go @@ -29,7 +29,7 @@ type ConsumerGroupOffsetOperation struct { func (operation *ConsumerGroupOffsetOperation) ResetConsumerGroupOffset(flags ResetConsumerGroupOffsetFlags, groupName string) error { - if (flags.Topic == nil || len(flags.Topic) == 0) && (!flags.AllTopics) { + if (len(flags.Topic) == 0) && (!flags.AllTopics) { return errors.New("no topic specified") } diff --git a/internal/producer/producer-operation.go b/internal/producer/producer-operation.go index ee0163a7..a514c1dd 100644 --- a/internal/producer/producer-operation.go +++ b/internal/producer/producer-operation.go @@ -213,7 +213,7 @@ func (operation *Operation) Produce(topic string, flags Flags) error { } if inputMessage, err = inputParser.ParseLine(line); err != nil { - return failWithMessageCount(messageCount, err.Error()) + return failWithMessageCount(messageCount, err.Error()) //nolint:govet } messageCount++