diff --git a/control-plane/pkg/contract/contract.pb.go b/control-plane/pkg/contract/contract.pb.go index c319671526..69cfa27d7b 100644 --- a/control-plane/pkg/contract/contract.pb.go +++ b/control-plane/pkg/contract/contract.pb.go @@ -7,10 +7,11 @@ package contract import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" ) const ( diff --git a/control-plane/pkg/reconciler/sink/kafka_sink.go b/control-plane/pkg/reconciler/sink/kafka_sink.go index 96c0c823f2..29d69cb06f 100644 --- a/control-plane/pkg/reconciler/sink/kafka_sink.go +++ b/control-plane/pkg/reconciler/sink/kafka_sink.go @@ -21,6 +21,9 @@ import ( "fmt" "time" + "knative.dev/eventing/pkg/auth" + "knative.dev/pkg/logging" + "github.com/IBM/sarama" "go.uber.org/zap" "k8s.io/apimachinery/pkg/types" @@ -240,9 +243,9 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) logger.Debug("Updated receiver pod annotation") - transportEncryptionFlags := feature.FromContext(ctx) + features := feature.FromContext(ctx) var addressableStatus duckv1.AddressStatus - if transportEncryptionFlags.IsPermissiveTransportEncryption() { + if features.IsPermissiveTransportEncryption() { caCerts, err := r.getCaCerts() if err != nil { return err @@ -257,7 +260,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) // - http address with path-based routing addressableStatus.Address = &httpAddress addressableStatus.Addresses = []duckv1.Addressable{httpsAddress, httpAddress} - } else if transportEncryptionFlags.IsStrictTransportEncryption() { + } else if features.IsStrictTransportEncryption() { // Strict mode: (only https addresses) // - status.address https address with path-based routing // - status.addresses: @@ -296,6 +299,25 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) ks.Status.AddressStatus = addressableStatus + if features.IsOIDCAuthentication() { + audience := auth.GetAudience(eventing.SchemeGroupVersion.WithKind("KafkaSink"), ks.ObjectMeta) + logging.FromContext(ctx).Debugw("Setting the kafkasinks audience", zap.String("audience", audience)) + ks.Status.Address.Audience = &audience + + for i := range ks.Status.Addresses { + ks.Status.Addresses[i].Audience = &audience + } + } else { + logging.FromContext(ctx).Debug("Clearing the kafkasinks audience as OIDC is not enabled") + if ks.Status.Address != nil { + ks.Status.Address.Audience = nil + } + + for i := range ks.Status.Addresses { + ks.Status.Addresses[i].Audience = nil + } + } + ks.GetConditionSet().Manage(ks.GetStatus()).MarkTrue(base.ConditionAddressable) return nil diff --git a/control-plane/pkg/reconciler/sink/kafka_sink_test.go b/control-plane/pkg/reconciler/sink/kafka_sink_test.go index 298641b446..3f6e36d1ac 100644 --- a/control-plane/pkg/reconciler/sink/kafka_sink_test.go +++ b/control-plane/pkg/reconciler/sink/kafka_sink_test.go @@ -22,6 +22,8 @@ import ( "io" "testing" + "knative.dev/eventing/pkg/auth" + "k8s.io/utils/pointer" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -49,6 +51,7 @@ import ( "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing" "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1" + kafkaeventing "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1" fakeeventingkafkaclient "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/client/fake" sinkreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/reconciler/eventing/v1alpha1/kafkasink" "knative.dev/eventing-kafka-broker/control-plane/pkg/receiver" @@ -95,6 +98,11 @@ var ( Path: fmt.Sprintf("/%s/%s", SinkNamespace, SinkName), } + sinkAudience = auth.GetAudience(kafkaeventing.SchemeGroupVersion.WithKind("KafkaSink"), metav1.ObjectMeta{ + Name: SinkName, + Namespace: SinkNamespace, + }) + errCreateTopic = fmt.Errorf("failed to create topic") errDeleteTopic = fmt.Errorf("failed to delete topic") @@ -1283,6 +1291,73 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) { ), }, }, + }, { + Name: "Reconciled normal - OIDC enabled - should provision audience", + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + }), + Objects: []runtime.Object{ + NewSink( + StatusControllerOwnsTopic(sink.ControllerTopicOwner), + ), + NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil), + SinkReceiverPod(env.SystemNamespace, map[string]string{ + "annotation_to_preserve": "value_to_preserve", + }), + }, + Key: testKey, + WantEvents: []string{ + finalizerUpdatedEvent, + }, + WantUpdates: []clientgotesting.UpdateActionImpl{ + ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{ + Resources: []*contract.Resource{ + { + Uid: SinkUUID, + Topics: []string{SinkTopic()}, + Ingress: &contract.Ingress{ContentMode: contract.ContentMode_BINARY, Path: receiver.Path(SinkNamespace, SinkName)}, + BootstrapServers: bootstrapServers, + Reference: SinkReference(), + }, + }, + Generation: 1, + }), + SinkReceiverPodUpdate(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "1", + "annotation_to_preserve": "value_to_preserve", + }), + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{ + { + Object: NewSink( + StatusControllerOwnsTopic(sink.ControllerTopicOwner), + InitSinkConditions, + StatusDataPlaneAvailable, + StatusConfigParsed, + BootstrapServers(bootstrapServersArr), + StatusConfigMapUpdatedReady(&env), + StatusTopicReadyWithOwner(SinkTopic(), sink.ControllerTopicOwner), + SinkAddressable(&env), + StatusProbeSucceeded, + WithSinkAddress(duckv1.Addressable{ + Name: pointer.String("http"), + URL: sinkAddress, + Audience: &sinkAudience, + }), + WithSinkAddresses([]duckv1.Addressable{ + { + Name: pointer.String("http"), + URL: sinkAddress, + Audience: &sinkAudience, + }, + }), + WithSinkAddessable(), + ), + }, + }, }, } diff --git a/test/e2e_new/sink_auth_test.go b/test/e2e_new/sink_auth_test.go new file mode 100644 index 0000000000..5b9b41ba83 --- /dev/null +++ b/test/e2e_new/sink_auth_test.go @@ -0,0 +1,58 @@ +//go:build e2e +// +build e2e + +/* + * Copyright 2023 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package e2e_new + +import ( + "testing" + "time" + + "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkasink" + "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkatopic" + + testpkg "knative.dev/eventing-kafka-broker/test/pkg" + "knative.dev/eventing/test/rekt/features/oidc" + "knative.dev/pkg/system" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/eventshub" + "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/k8s" + "knative.dev/reconciler-test/pkg/knative" +) + +func TestKafkaSinkSupportsOIDC(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.WithPollTimings(4*time.Second, 12*time.Minute), + environment.Managed(t), + eventshub.WithTLS(t), + ) + + topic := feature.MakeRandomK8sName("topic") + sink := feature.MakeRandomK8sName("kafkasink") + env.Prerequisite(ctx, t, kafkatopic.GoesReady(topic)) + env.Prerequisite(ctx, t, kafkasink.GoesReady(sink, topic, testpkg.BootstrapServersPlaintextArr)) + + env.TestSet(ctx, t, oidc.AddressableOIDCConformance(kafkasink.GVR(), "KafkaSink", sink, env.Namespace())) +} diff --git a/test/rekt/resources/kafkasink/kafkasink.go b/test/rekt/resources/kafkasink/kafkasink.go index 158eeae2e1..188202b00c 100644 --- a/test/rekt/resources/kafkasink/kafkasink.go +++ b/test/rekt/resources/kafkasink/kafkasink.go @@ -116,3 +116,14 @@ func ValidateAddress(name string, validate addressable.ValidateAddressFn, timing } } } + +// GoesReady returns a feature that will create a KafkaSink of the given +// name and topic, and confirm it becomes ready. +func GoesReady(name, topic string, bootstrapServers []string, cfg ...manifest.CfgFn) *feature.Feature { + f := new(feature.Feature) + + f.Setup(fmt.Sprintf("install KafkaSink %q", name), Install(name, topic, bootstrapServers, cfg...)) + f.Setup("KafkaSink is ready", IsReady(name)) + + return f +} diff --git a/test/rekt/resources/kafkatopic/topic.go b/test/rekt/resources/kafkatopic/topic.go index 544e547a9d..62f46a0497 100644 --- a/test/rekt/resources/kafkatopic/topic.go +++ b/test/rekt/resources/kafkatopic/topic.go @@ -172,3 +172,14 @@ func WithClusterNamespace(namespace string) manifest.CfgFn { cfg["clusterNamespace"] = namespace } } + +// GoesReady returns a feature that will create a topic of the given +// name and confirm it becomes ready. +func GoesReady(name string, cfg ...manifest.CfgFn) *feature.Feature { + f := new(feature.Feature) + + f.Setup(fmt.Sprintf("install Topic %q", name), Install(name, cfg...)) + f.Setup("Topic is ready", IsReady(name)) + + return f +}