Skip to content

Commit

Permalink
[release-1.15] Support explicit protocol configuration in KafkaChanne…
Browse files Browse the repository at this point in the history
…l secret (#4131)

* Support explicit `protocol` configuration in KafkaChannel secret

KafkaChannel has historically used the "legacy" secret format,
however, it has limitations as not having the explitic protocol
configuration, we're left with guessing the protocol.

We will continue supporting the "legacy" format, however,
we will encourage users to use the new secret format, so that
it can also be used for Kafka Broker, KafkaChannel and KafkaSink.

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Remove unnecessary saslType in non-legacy test user secrets

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Use sasl.mechanism in KafkaSource tests

Signed-off-by: Pierangelo Di Pilato <[email protected]>

---------

Signed-off-by: Pierangelo Di Pilato <[email protected]>
Co-authored-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
knative-prow-robot and pierDipi authored Oct 17, 2024
1 parent dae1f5f commit 7092bb9
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 15 deletions.
9 changes: 9 additions & 0 deletions control-plane/pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,15 @@ func (r *Reconciler) getChannelContractResource(ctx context.Context, topic strin
resource.Auth = &contract.Resource_MultiAuthSecret{
MultiAuthSecret: auth.MultiSecretReference,
}
} else if auth != nil && auth.VirtualSecret != nil {
resource.Auth = &contract.Resource_AuthSecret{
AuthSecret: &contract.Reference{
Uuid: string(auth.VirtualSecret.UID),
Namespace: auth.VirtualSecret.Namespace,
Name: auth.VirtualSecret.Name,
Version: auth.VirtualSecret.ResourceVersion,
},
}
}

if channel.Status.Address != nil && channel.Status.Address.Audience != nil {
Expand Down
21 changes: 10 additions & 11 deletions control-plane/pkg/reconciler/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,22 +240,21 @@ func (r *Reconciler) reconcileAuth(ctx context.Context, c *kafkainternals.Consum
return fmt.Errorf("failed to get secret: %w", err)
}

if _, ok := secret.Data[security.ProtocolKey]; !ok {
authContext, err := security.ResolveAuthContextFromLegacySecret(secret)
if err != nil {
return err
}

authContext, err := security.ResolveAuthContextFromLegacySecret(secret)
if err != nil {
return err
}
if authContext.MultiSecretReference != nil {
resource.Auth = &contract.Resource_MultiAuthSecret{
MultiAuthSecret: authContext.MultiSecretReference,
}
} else {
} else if authContext.VirtualSecret != nil {
resource.Auth = &contract.Resource_AuthSecret{
AuthSecret: &contract.Reference{
Uuid: string(secret.UID),
Namespace: secret.Namespace,
Name: secret.Name,
Version: secret.ResourceVersion,
Uuid: string(authContext.VirtualSecret.UID),
Namespace: authContext.VirtualSecret.Namespace,
Name: authContext.VirtualSecret.Name,
Version: authContext.VirtualSecret.ResourceVersion,
},
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ func ResolveAuthContextFromLegacySecret(s *corev1.Secret) (*NetSpecAuthContext,
return &NetSpecAuthContext{}, nil
}

// Check if the secret is a legacy secret format without the explicit `protocol` key
if v, ok := s.Data[ProtocolKey]; ok && len(v) > 0 {
// The secret is explicitly using `protocol` configuration, no need to guess it.
return &NetSpecAuthContext{VirtualSecret: s}, nil
}

protocolStr, protocolContract := getProtocolFromLegacyChannelSecret(s)

virtualSecret := s.DeepCopy()
Expand Down
1 change: 1 addition & 0 deletions control-plane/pkg/security/secrets_provider_net_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corelisters "k8s.io/client-go/listers/core/v1"

bindings "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/bindings/v1beta1"

"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"
Expand Down
2 changes: 0 additions & 2 deletions test/e2e-common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,6 @@ function create_sasl_secrets() {
--from-literal=user="my-sasl-user" \
--from-literal=protocol="SASL_SSL" \
--from-literal=sasl.mechanism="SCRAM-SHA-512" \
--from-literal=saslType="SCRAM-SHA-512" \
--dry-run=client -o yaml | kubectl apply -n "${SYSTEM_NAMESPACE}" -f -

kubectl create secret --namespace "${SYSTEM_NAMESPACE}" generic strimzi-sasl-secret-legacy \
Expand All @@ -474,7 +473,6 @@ function create_sasl_secrets() {
--from-literal=user="my-sasl-user" \
--from-literal=protocol="SASL_PLAINTEXT" \
--from-literal=sasl.mechanism="SCRAM-SHA-512" \
--from-literal=saslType="SCRAM-SHA-512" \
--dry-run=client -o yaml | kubectl apply -n "${SYSTEM_NAMESPACE}" -f -

kubectl create secret --namespace "${SYSTEM_NAMESPACE}" generic strimzi-sasl-plain-secret-legacy \
Expand Down
2 changes: 1 addition & 1 deletion test/rekt/features/kafka_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func kafkaSourceFeature(name string,
kafkasource.WithSASLEnabled(),
kafkasource.WithSASLUser(secretName, "user"),
kafkasource.WithSASLPassword(secretName, "password"),
kafkasource.WithSASLType(secretName, "saslType"),
kafkasource.WithSASLType(secretName, "sasl.mechanism"),
kafkasource.WithTLSEnabled(),
kafkasource.WithTLSCACert(secretName, "ca.crt"),
)
Expand Down
2 changes: 1 addition & 1 deletion test/rekt/features/kafka_source_create_secrets_after.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func CreateSecretsAfterKafkaSource() *feature.Feature {
kafkasource.WithSASLEnabled(),
kafkasource.WithSASLUser(saslSecretName, "user"),
kafkasource.WithSASLPassword(saslSecretName, "password"),
kafkasource.WithSASLType(saslSecretName, "saslType"),
kafkasource.WithSASLType(saslSecretName, "sasl.mechanism"),
kafkasource.WithTLSEnabled(),
kafkasource.WithTLSCACert(tlsSecretName, "ca.crt"),
))
Expand Down

0 comments on commit 7092bb9

Please sign in to comment.