Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow users to use well-known ports for client connections to both ingress controllers #997

Merged
merged 10 commits into from
Jun 21, 2023
Merged
6 changes: 3 additions & 3 deletions controllers/tests/kafkacluster_controller_envoy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func expectEnvoyLoadBalancer(ctx context.Context, kafkaCluster *v1beta1.KafkaClu
Expect(loadBalancer.Spec.Ports[3].Name).To(Equal("tcp-all-broker"))
Expect(loadBalancer.Spec.Ports[3].Protocol).To(Equal(corev1.ProtocolTCP))
Expect(loadBalancer.Spec.Ports[3].Port).To(BeEquivalentTo(29092))
Expect(loadBalancer.Spec.Ports[3].TargetPort.IntVal).To(BeEquivalentTo(29092))
Expect(loadBalancer.Spec.Ports[3].TargetPort.StrVal).To(BeEquivalentTo("tcp-all-broker"))

Expect(loadBalancer.Spec.Ports[4].Name).To(Equal("tcp-health"))
Expect(loadBalancer.Spec.Ports[4].Protocol).To(Equal(corev1.ProtocolTCP))
Expand Down Expand Up @@ -391,7 +391,7 @@ func expectEnvoyWithConfigAz1(ctx context.Context, kafkaCluster *v1beta1.KafkaCl
Expect(loadBalancer.Spec.Ports[1].Name).To(Equal("tcp-all-broker"))
Expect(loadBalancer.Spec.Ports[1].Protocol).To(Equal(corev1.ProtocolTCP))
Expect(loadBalancer.Spec.Ports[1].Port).To(BeEquivalentTo(29092))
Expect(loadBalancer.Spec.Ports[1].TargetPort.IntVal).To(BeEquivalentTo(29092))
Expect(loadBalancer.Spec.Ports[1].TargetPort.StrVal).To(BeEquivalentTo("tcp-all-broker"))

Expect(loadBalancer.Spec.Ports[2].Name).To(Equal("tcp-health"))
Expect(loadBalancer.Spec.Ports[2].Protocol).To(Equal(corev1.ProtocolTCP))
Expand Down Expand Up @@ -601,7 +601,7 @@ func expectEnvoyWithConfigAz2(ctx context.Context, kafkaCluster *v1beta1.KafkaCl
Expect(loadBalancer.Spec.Ports[2].Name).To(Equal("tcp-all-broker"))
Expect(loadBalancer.Spec.Ports[2].Protocol).To(Equal(corev1.ProtocolTCP))
Expect(loadBalancer.Spec.Ports[2].Port).To(BeEquivalentTo(29092))
Expect(loadBalancer.Spec.Ports[2].TargetPort.IntVal).To(BeEquivalentTo(29092))
Expect(loadBalancer.Spec.Ports[2].TargetPort.StrVal).To(BeEquivalentTo("tcp-all-broker"))

Expect(loadBalancer.Spec.Ports[3].Name).To(Equal("tcp-health"))
Expect(loadBalancer.Spec.Ports[3].Protocol).To(Equal(corev1.ProtocolTCP))
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/banzaicloud/istio-client-go v0.0.17
github.com/banzaicloud/istio-operator/api/v2 v2.15.1
github.com/banzaicloud/k8s-objectmatcher v1.8.0
github.com/banzaicloud/koperator/api v0.28.2
github.com/banzaicloud/koperator/api v0.28.3
github.com/banzaicloud/koperator/properties v0.4.1
github.com/cert-manager/cert-manager v1.11.2
github.com/cisco-open/cluster-registry-controller/api v0.2.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ github.com/banzaicloud/istio-operator/api/v2 v2.15.1 h1:BZg8COvoOJtfx/dgN7KpoOnc
github.com/banzaicloud/istio-operator/api/v2 v2.15.1/go.mod h1:5qCpwWlIfxiLvBfTvT2mD2wp5RlFCDEt8Xql4sYPNBc=
github.com/banzaicloud/k8s-objectmatcher v1.8.0 h1:Nugn25elKtPMTA2br+JgHNeSQ04sc05MDPmpJnd1N2A=
github.com/banzaicloud/k8s-objectmatcher v1.8.0/go.mod h1:p2LSNAjlECf07fbhDyebTkPUIYnU05G+WfGgkTmgeMg=
github.com/banzaicloud/koperator/api v0.28.2 h1:x3n9dl+zhePgfwXFZlbzqeoI8veF4gL7JJAdKvfc61M=
github.com/banzaicloud/koperator/api v0.28.2/go.mod h1:MpHo+CO1TmVyHooJ3n3+o6ibL/AKlBp7L9bdyKy5Ec0=
github.com/banzaicloud/koperator/api v0.28.3 h1:bD4vBJr0Bsw+oX1kDV+hp/XR5hWp+g3YwIJi7TuHh6o=
github.com/banzaicloud/koperator/api v0.28.3/go.mod h1:fo0y8UdiH9YPE+sIK5LcJWG6hd0pIA13F4li6DOIal4=
github.com/banzaicloud/koperator/properties v0.4.1 h1:SB2QgXlcK1Dc7Z1rg65PJifErDa8OQnoWCCJgmC7SGc=
github.com/banzaicloud/koperator/properties v0.4.1/go.mod h1:TcL+llxuhW3UeQtVEDYEXGouFLF2P+LuZZVudSb6jyA=
github.com/banzaicloud/operator-tools v0.28.0 h1:GSfc0qZr6zo7WrNxdgWZE1LcTChPU8QFYOTDirYVtIM=
Expand Down
2 changes: 1 addition & 1 deletion pkg/resources/envoy/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func GenerateEnvoyConfig(kc *v1beta1.KafkaCluster, elistener v1beta1.ExternalLis
SocketAddress: &envoycore.SocketAddress{
Address: "0.0.0.0",
PortSpecifier: &envoycore.SocketAddress_PortValue{
PortValue: uint32(elistener.GetAnyCastPort()),
PortValue: uint32(elistener.GetIngressControllerTargetPort()),
},
},
},
Expand Down
8 changes: 6 additions & 2 deletions pkg/resources/envoy/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ func getExposedContainerPorts(extListener v1beta1.ExternalListenerConfig, broker
}
}
exposedPorts = append(exposedPorts, corev1.ContainerPort{
Name: fmt.Sprintf(kafkautils.AllBrokerServiceTemplate, "tcp"),
ContainerPort: extListener.GetAnyCastPort(),
Name: getAllBrokerContainerPortName(),
ContainerPort: extListener.GetIngressControllerTargetPort(),
Protocol: corev1.ProtocolTCP,
})

Expand All @@ -163,3 +163,7 @@ func generatePodAnnotations(kafkaCluster *v1beta1.KafkaCluster,
}
return util.MergeAnnotations(ingressConfig.EnvoyConfig.GetAnnotations(), annotations)
}

func getAllBrokerContainerPortName() string {
return fmt.Sprintf(kafkautils.AllBrokerServiceTemplate, "tcp")
}
2 changes: 1 addition & 1 deletion pkg/resources/envoy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func getExposedServicePorts(extListener v1beta1.ExternalListenerConfig, brokersI
// append anycast port
exposedPorts = append(exposedPorts, corev1.ServicePort{
Name: fmt.Sprintf(kafkautils.AllBrokerServiceTemplate, "tcp"),
TargetPort: intstr.FromInt(int(extListener.GetAnyCastPort())),
TargetPort: intstr.FromString(getAllBrokerContainerPortName()),
Port: extListener.GetAnyCastPort(),
Protocol: corev1.ProtocolTCP,
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/resources/istioingress/meshgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func generateExternalPorts(kc *v1beta1.KafkaCluster, brokerIds []int,
Name: fmt.Sprintf(kafkautils.AllBrokerServiceTemplate, "tcp"),
Protocol: string(corev1.ProtocolTCP),
Port: externalListenerConfig.GetAnyCastPort(),
TargetPort: &istioOperatorApi.IntOrString{IntOrString: intstr.FromInt(int(externalListenerConfig.GetAnyCastPort()))},
TargetPort: &istioOperatorApi.IntOrString{IntOrString: intstr.FromInt(int(externalListenerConfig.GetIngressControllerTargetPort()))},
Copy link
Member Author

@panyuenlau panyuenlau Jun 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI - looks like named targetPort can't be used here, becasue istio-operator complains about not having an exact value for this field when it reconciles the IstioMeshGateway resource. But I could me missing something

Copy link
Member

@pregnor pregnor Jun 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's saddening.

})

return generatedPorts
Expand Down
4 changes: 4 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,3 +548,7 @@ func RetryOnError(backoff wait.Backoff, fn func() error, isRetryableError func(e
func RetryOnConflict(backoff wait.Backoff, fn func() error) error {
return RetryOnError(backoff, fn, apierrors.IsConflict)
}

func GetExternalPortForBroker(externalStartingPort, brokerId int32) int32 {
return externalStartingPort + brokerId
}
15 changes: 8 additions & 7 deletions pkg/webhooks/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import (
)

const (
cantConnectErrorMsg = "failed to connect to kafka cluster"
cantConnectAPIServerMsg = "failed to connect to Kubernetes API server"
invalidReplicationFactorErrMsg = "replication factor is larger than the number of nodes in the kafka cluster"
outOfRangeReplicationFactorErrMsg = "replication factor must be larger than 0 (or set it to be -1 to use the broker's default)"
outOfRangePartitionsErrMsg = "number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)"
unsupportedRemovingStorageMsg = "removing storage from a broker is not supported"
invalidExternalListenerStartingPortErrMsg = "invalid external listener starting port number"
cantConnectErrorMsg = "failed to connect to kafka cluster"
cantConnectAPIServerMsg = "failed to connect to Kubernetes API server"
invalidReplicationFactorErrMsg = "replication factor is larger than the number of nodes in the kafka cluster"
outOfRangeReplicationFactorErrMsg = "replication factor must be larger than 0 (or set it to be -1 to use the broker's default)"
outOfRangePartitionsErrMsg = "number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)"
unsupportedRemovingStorageMsg = "removing storage from a broker is not supported"
invalidExternalListenerStartingPortErrMsg = "invalid external listener starting port number"
invalidContainerPortForIngressControllerErrMsg = "invalid trarget port number for ingress controller deployment"

// errorDuringValidationMsg is added to infrastructure errors (e.g. failed to connect), but not to field validation errors
errorDuringValidationMsg = "error during validation"
Expand Down
92 changes: 85 additions & 7 deletions pkg/webhooks/kafkacluster_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"emperror.dev/errors"
"golang.org/x/exp/slices"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
Expand Down Expand Up @@ -99,7 +100,7 @@ func checkBrokerStorageRemoval(kafkaClusterSpecOld, kafkaClusterSpecNew *banzaic
if err != nil {
return nil, err
}
// checking broukerConfigGroup existence
// checking brokerConfigGroup existence
if brokerNew.BrokerConfigGroup != "" {
if _, exists := kafkaClusterSpecNew.BrokerConfigGroups[brokerNew.BrokerConfigGroup]; !exists {
return field.Invalid(field.NewPath("spec").Child("brokers").Index(int(brokerNew.Id)).Child("brokerConfigGroup"), brokerNew.BrokerConfigGroup, unsupportedRemovingStorageMsg+", provided brokerConfigGroup not found"), nil
Expand Down Expand Up @@ -162,10 +163,24 @@ func getMissingMounthPathLocation(mounthPath string, kafkaClusterSpec *banzaiclo
func checkInternalAndExternalListeners(kafkaClusterSpec *banzaicloudv1beta1.KafkaClusterSpec) field.ErrorList {
var allErrs field.ErrorList

allErrs = append(allErrs, checkUniqueListenerContainerPort(kafkaClusterSpec.ListenersConfig)...)
allErrs = append(allErrs, checkInternalListeners(kafkaClusterSpec)...)

allErrs = append(allErrs, checkExternalListeners(kafkaClusterSpec)...)

return allErrs
}

func checkInternalListeners(kafkaClusterSpec *banzaicloudv1beta1.KafkaClusterSpec) field.ErrorList {
return checkUniqueListenerContainerPort(kafkaClusterSpec.ListenersConfig)
}

func checkExternalListeners(kafkaClusterSpec *banzaicloudv1beta1.KafkaClusterSpec) field.ErrorList {
var allErrs field.ErrorList

allErrs = append(allErrs, checkExternalListenerStartingPort(kafkaClusterSpec)...)

allErrs = append(allErrs, checkTargetPortsCollisionForEnvoy(kafkaClusterSpec)...)

return allErrs
}

Expand Down Expand Up @@ -205,17 +220,80 @@ func checkExternalListenerStartingPort(kafkaClusterSpec *banzaicloudv1beta1.Kafk
var allErrs field.ErrorList
const maxPort int32 = 65535
for i, extListener := range kafkaClusterSpec.ListenersConfig.ExternalListeners {
var invalidBrokerIDs []int32
var outOfRangeBrokerIDs, collidingPortsBrokerIDs []int32
for _, broker := range kafkaClusterSpec.Brokers {
if extListener.ExternalStartingPort+broker.Id < 1 || extListener.ExternalStartingPort+broker.Id > maxPort {
invalidBrokerIDs = append(invalidBrokerIDs, broker.Id)
externalPort := util.GetExternalPortForBroker(extListener.ExternalStartingPort, broker.Id)
if externalPort < 1 || externalPort > maxPort {
outOfRangeBrokerIDs = append(outOfRangeBrokerIDs, broker.Id)
}

if externalPort == extListener.GetIngressControllerTargetPort() {
collidingPortsBrokerIDs = append(collidingPortsBrokerIDs, broker.Id)
}

if kafkaClusterSpec.GetIngressController() == "envoy" {
if externalPort == kafkaClusterSpec.EnvoyConfig.GetEnvoyAdminPort() || externalPort == kafkaClusterSpec.EnvoyConfig.GetEnvoyHealthCheckPort() {
collidingPortsBrokerIDs = append(collidingPortsBrokerIDs, broker.Id)
}
}
}
if len(invalidBrokerIDs) > 0 {
errmsg := invalidExternalListenerStartingPortErrMsg + ": " + fmt.Sprintf("ExternalListener '%s' would generate external access port numbers (externalStartingPort + Broker ID) that are out of range (not between 1 and 65535) for brokers %v", extListener.Name, invalidBrokerIDs)

if len(outOfRangeBrokerIDs) > 0 {
errmsg := invalidExternalListenerStartingPortErrMsg + ": " + fmt.Sprintf("ExternalListener '%s' would generate external access port numbers (externalStartingPort + Broker ID) that are out of range (not between 1 and 65535) for brokers %v", extListener.Name, outOfRangeBrokerIDs)
fldErr := field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(i).Child("externalStartingPort"), extListener.ExternalStartingPort, errmsg)
allErrs = append(allErrs, fldErr)
}

if len(collidingPortsBrokerIDs) > 0 {
errmsg := invalidExternalListenerStartingPortErrMsg + ": " + fmt.Sprintf("ExternalListener '%s' would generate external access port numbers ("+
"externalStartingPort + Broker ID) that collide with either the envoy admin port ('%d'), the envoy health-check port ('%d'), or the ingressControllerTargetPort ('%d') for brokers %v",
extListener.Name, kafkaClusterSpec.EnvoyConfig.GetEnvoyAdminPort(), kafkaClusterSpec.EnvoyConfig.GetEnvoyHealthCheckPort(), extListener.GetIngressControllerTargetPort(), collidingPortsBrokerIDs)
fldErr := field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(i).Child("externalStartingPort"), extListener.ExternalStartingPort, errmsg)
allErrs = append(allErrs, fldErr)
}
}
return allErrs
}

// checkTargetPortsCollisionForEnvoy checks if the IngressControllerTargetPort collides with the other container ports for envoy deployment
func checkTargetPortsCollisionForEnvoy(kafkaClusterSpec *banzaicloudv1beta1.KafkaClusterSpec) field.ErrorList {
if kafkaClusterSpec.GetIngressController() != "envoy" {
return nil
}

var allErrs field.ErrorList

ap := kafkaClusterSpec.EnvoyConfig.GetEnvoyAdminPort()
hcp := kafkaClusterSpec.EnvoyConfig.GetEnvoyHealthCheckPort()

if ap == hcp {
errmsg := invalidContainerPortForIngressControllerErrMsg + ": The envoy configuration uses an admin port number that collides with the health-check port number"
fldErr := field.Invalid(field.NewPath("spec").Child("envoyConfig").Child("adminPort"), kafkaClusterSpec.EnvoyConfig.GetEnvoyAdminPort(), errmsg)
allErrs = append(allErrs, fldErr)
}

if kafkaClusterSpec.ListenersConfig.ExternalListeners != nil {
for i, extListener := range kafkaClusterSpec.ListenersConfig.ExternalListeners {
// the ingress controller target port only has impact while using LoadBalancer to access the Kafka cluster
if extListener.GetAccessMethod() != corev1.ServiceTypeLoadBalancer {
continue
}

if extListener.GetIngressControllerTargetPort() == ap {
errmsg := invalidContainerPortForIngressControllerErrMsg + ": " + fmt.Sprintf(
"ExternalListener '%s' uses an ingress controller target port number that collides with the envoy's admin port", extListener.Name)
fldErr := field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(i).Child("ingressControllerTargetPort"), extListener.GetIngressControllerTargetPort(), errmsg)
allErrs = append(allErrs, fldErr)
}

if extListener.GetIngressControllerTargetPort() == hcp {
errmsg := invalidContainerPortForIngressControllerErrMsg + ": " + fmt.Sprintf(
"ExternalListener '%s' uses an ingress controller target port number that collides with the envoy's health-check port", extListener.Name)
fldErr := field.Invalid(field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(i).Child("ingressControllerTargetPort"), extListener.GetIngressControllerTargetPort(), errmsg)
allErrs = append(allErrs, fldErr)
}
}
}

return allErrs
}
Loading