diff --git a/control-plane/pkg/reconciler/base/reconciler.go b/control-plane/pkg/reconciler/base/reconciler.go index 900a5079f9..2a97824a08 100644 --- a/control-plane/pkg/reconciler/base/reconciler.go +++ b/control-plane/pkg/reconciler/base/reconciler.go @@ -19,7 +19,7 @@ package base import ( "context" "fmt" - "strconv" + "time" "go.uber.org/zap" "google.golang.org/protobuf/encoding/protojson" @@ -67,6 +67,9 @@ const ( // volume generation annotation data plane pods. VolumeGenerationAnnotationKey = "volumeGeneration" + // config features update time annotation for data plane pods. + ConfigFeaturesUpdatedAnnotationKey = "configFeaturesUpdatedAt" + Protobuf = "protobuf" Json = "json" ) @@ -281,23 +284,31 @@ func (r *Reconciler) UpdateDataPlaneConfigMap(ctx context.Context, contract *con return nil } -func (r *Reconciler) UpdateDispatcherPodsAnnotation(ctx context.Context, logger *zap.Logger, volumeGeneration uint64) error { +func (r *Reconciler) UpdateDispatcherPodsContractGenerationAnnotation(ctx context.Context, logger *zap.Logger, volumeGeneration uint64) error { pods, errors := r.PodLister.Pods(r.DataPlaneNamespace).List(r.dispatcherSelector()) if errors != nil { return fmt.Errorf("failed to list dispatcher pods in namespace %s: %w", r.DataPlaneNamespace, errors) } - return r.UpdatePodsAnnotation(ctx, logger, "dispatcher", volumeGeneration, pods) + return r.UpdatePodsAnnotation(ctx, logger, "dispatcher", VolumeGenerationAnnotationKey, fmt.Sprint(volumeGeneration), pods) } -func (r *Reconciler) UpdateReceiverPodsAnnotation(ctx context.Context, logger *zap.Logger, volumeGeneration uint64) error { +func (r *Reconciler) UpdateReceiverPodsContractGenerationAnnotation(ctx context.Context, logger *zap.Logger, volumeGeneration uint64) error { pods, errors := r.PodLister.Pods(r.DataPlaneNamespace).List(r.ReceiverSelector()) if errors != nil { return fmt.Errorf("failed to list receiver pods in namespace %s: %w", r.DataPlaneNamespace, errors) } - return r.UpdatePodsAnnotation(ctx, logger, "receiver", volumeGeneration, pods) + return r.UpdatePodsAnnotation(ctx, logger, "receiver", VolumeGenerationAnnotationKey, fmt.Sprint(volumeGeneration), pods) +} + +func (r *Reconciler) UpdateReceiverConfigFeaturesUpdatedAnnotation(ctx context.Context, logger *zap.Logger) error { + pods, err := r.PodLister.Pods(r.DataPlaneNamespace).List(r.ReceiverSelector()) + if err != nil { + return fmt.Errorf("failed to list receiver pods in namespace %s: %s", r.DataPlaneNamespace, err) + } + return r.UpdatePodsAnnotation(ctx, logger, "receiver", ConfigFeaturesUpdatedAnnotationKey, time.Now().String(), pods) } -func (r *Reconciler) UpdatePodsAnnotation(ctx context.Context, logger *zap.Logger, component string, volumeGeneration uint64, pods []*corev1.Pod) error { +func (r *Reconciler) UpdatePodsAnnotation(ctx context.Context, logger *zap.Logger, component, annotationKey, annotationValue string, pods []*corev1.Pod) error { var errors error @@ -306,7 +317,7 @@ func (r *Reconciler) UpdatePodsAnnotation(ctx context.Context, logger *zap.Logge logger.Debug( "Update "+component+" pod annotation", zap.String("pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)), - zap.Uint64("volumeGeneration", volumeGeneration), + zap.String(annotationKey, annotationValue), ) // do not update cache copy @@ -318,15 +329,15 @@ func (r *Reconciler) UpdatePodsAnnotation(ctx context.Context, logger *zap.Logge } // Check whether pod's annotation is the expected one. - if v, ok := annotations[VolumeGenerationAnnotationKey]; ok { - v, err := strconv.ParseUint(v /* base */, 10 /* bitSize */, 64) - if err == nil && v == volumeGeneration { - // Volume generation already matches the expected volume generation number. + if v, ok := annotations[annotationKey]; ok { + if v == annotationValue { + logger.Debug(component + " pod annotation already up to date") + // annotation is already correct. continue } } - annotations[VolumeGenerationAnnotationKey] = fmt.Sprint(volumeGeneration) + annotations[annotationKey] = annotationValue pod.SetAnnotations(annotations) if _, err := r.KubeClient.CoreV1().Pods(pod.Namespace).Update(ctx, pod, metav1.UpdateOptions{}); err != nil { diff --git a/control-plane/pkg/reconciler/base/reconciler_test.go b/control-plane/pkg/reconciler/base/reconciler_test.go index b84b5d1223..336acf164e 100644 --- a/control-plane/pkg/reconciler/base/reconciler_test.go +++ b/control-plane/pkg/reconciler/base/reconciler_test.go @@ -230,7 +230,7 @@ func TestUpdateReceiverPodAnnotation(t *testing.T) { ReceiverLabel: base.SinkReceiverLabel, } - err := r.UpdateReceiverPodsAnnotation(ctx, logging.FromContext(ctx).Desugar(), 1) + err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logging.FromContext(ctx).Desugar(), 1) require.Nil(t, err) } @@ -247,7 +247,7 @@ func TestUpdateDispatcherPodAnnotation(t *testing.T) { DispatcherLabel: label, } - err := r.UpdateDispatcherPodsAnnotation(ctx, logging.FromContext(ctx).Desugar(), 1) + err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logging.FromContext(ctx).Desugar(), 1) require.Nil(t, err) } diff --git a/control-plane/pkg/reconciler/broker/broker.go b/control-plane/pkg/reconciler/broker/broker.go index 99d80ec67c..2014dc9f13 100644 --- a/control-plane/pkg/reconciler/broker/broker.go +++ b/control-plane/pkg/reconciler/broker/broker.go @@ -207,7 +207,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) // the update even if here eventually means seconds or minutes after the actual update. // Update volume generation annotation of receiver pods - if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil { + if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil { logger.Error("Failed to update receiver pod annotation", zap.Error( statusConditionManager.FailedToUpdateReceiverPodsAnnotation(err), )) @@ -217,7 +217,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) logger.Debug("Updated receiver pod annotation") // Update volume generation annotation of dispatcher pods - if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil { + if err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil { // Failing to update dispatcher pods annotation leads to config map refresh delayed by several seconds. // Since the dispatcher side is the consumer side, we don't lose availability, and we can consider the Broker // ready. So, log out the error and move on to the next step. @@ -496,11 +496,11 @@ func (r *Reconciler) deleteResourceFromContractConfigMap(ctx context.Context, lo // Note: if there aren't changes to be done at the pod annotation level, we just skip the update. // Update volume generation annotation of receiver pods - if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil { + if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil { return err } // Update volume generation annotation of dispatcher pods - if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil { + if err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil { return err } diff --git a/control-plane/pkg/reconciler/broker/controller.go b/control-plane/pkg/reconciler/broker/controller.go index 9501d1fb1e..369894f95c 100644 --- a/control-plane/pkg/reconciler/broker/controller.go +++ b/control-plane/pkg/reconciler/broker/controller.go @@ -102,6 +102,10 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E if globalResync != nil { globalResync(obj) } + err = reconciler.UpdateReceiverConfigFeaturesUpdatedAnnotation(ctx, logger.Desugar()) + if err != nil { + logger.Warn("config-features updated, but the receiver pods were not successfully annotated. This may lead to features not working as expected.", zap.Error(err)) + } }) featureStore.WatchConfigs(watcher) diff --git a/control-plane/pkg/reconciler/channel/channel.go b/control-plane/pkg/reconciler/channel/channel.go index 3061540895..35af8b9c28 100644 --- a/control-plane/pkg/reconciler/channel/channel.go +++ b/control-plane/pkg/reconciler/channel/channel.go @@ -261,7 +261,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta // the update even if here eventually means seconds or minutes after the actual update. // Update volume generation annotation of receiver pods - if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil { + if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil { logger.Error("Failed to update receiver pod annotation", zap.Error( statusConditionManager.FailedToUpdateReceiverPodsAnnotation(err), )) @@ -415,7 +415,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, channel *messagingv1beta1 // Note: if there aren't changes to be done at the pod annotation level, we just skip the update. // Update volume generation annotation of receiver pods - if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil { + if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil { return err } diff --git a/control-plane/pkg/reconciler/channel/controller.go b/control-plane/pkg/reconciler/channel/controller.go index 1d685e2d10..7e944337a6 100644 --- a/control-plane/pkg/reconciler/channel/controller.go +++ b/control-plane/pkg/reconciler/channel/controller.go @@ -104,6 +104,10 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf if globalResync != nil { globalResync(nil) } + err = reconciler.UpdateReceiverConfigFeaturesUpdatedAnnotation(ctx, logger.Desugar()) + if err != nil { + logger.Warn("config-features updated, but the receiver pods were not successfully annotated. This may lead to features not working as expected.", zap.Error(err)) + } }) featureStore.WatchConfigs(watcher) diff --git a/control-plane/pkg/reconciler/consumer/consumer.go b/control-plane/pkg/reconciler/consumer/consumer.go index 3b645e7bb1..e4d57f7b93 100644 --- a/control-plane/pkg/reconciler/consumer/consumer.go +++ b/control-plane/pkg/reconciler/consumer/consumer.go @@ -435,7 +435,7 @@ func (r *Reconciler) schedule(ctx context.Context, logger *zap.Logger, c *kafkai return false, err } - return true, b.UpdatePodsAnnotation(ctx, logger, "dispatcher" /* component, for logging */, ct.Generation, []*corev1.Pod{p}) + return true, b.UpdatePodsAnnotation(ctx, logger, "dispatcher" /* component, for logging */, base.VolumeGenerationAnnotationKey, fmt.Sprint(ct.Generation), []*corev1.Pod{p}) } func (r *Reconciler) commonReconciler(p *corev1.Pod, cmName string) base.Reconciler { diff --git a/control-plane/pkg/reconciler/sink/controller.go b/control-plane/pkg/reconciler/sink/controller.go index ba70b34ae5..43f2d32675 100644 --- a/control-plane/pkg/reconciler/sink/controller.go +++ b/control-plane/pkg/reconciler/sink/controller.go @@ -86,6 +86,10 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf if globalResync != nil { globalResync(obj) } + err = reconciler.UpdateReceiverConfigFeaturesUpdatedAnnotation(ctx, logger.Desugar()) + if err != nil { + logger.Warn("config-features updated, but the receiver pods were not successfully annotated. This may lead to features not working as expected.", zap.Error(err)) + } }) featureStore.WatchConfigs(watcher) diff --git a/control-plane/pkg/reconciler/sink/kafka_sink.go b/control-plane/pkg/reconciler/sink/kafka_sink.go index 5133b93b08..96c0c823f2 100644 --- a/control-plane/pkg/reconciler/sink/kafka_sink.go +++ b/control-plane/pkg/reconciler/sink/kafka_sink.go @@ -234,7 +234,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) // receivers haven't got the Sink, so update failures to receiver pods is a hard failure. // Update volume generation annotation of receiver pods - if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil { + if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil { return err } @@ -341,7 +341,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, ks *eventing.KafkaSink) e // Note: if there aren't changes to be done at the pod annotation level, we just skip the update. // Update volume generation annotation of receiver pods - if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil { + if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil { return err } diff --git a/control-plane/pkg/reconciler/trigger/trigger.go b/control-plane/pkg/reconciler/trigger/trigger.go index 607945461a..e0b437d957 100644 --- a/control-plane/pkg/reconciler/trigger/trigger.go +++ b/control-plane/pkg/reconciler/trigger/trigger.go @@ -198,7 +198,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, trigger *eventing.Trigge } // Update volume generation annotation of dispatcher pods - if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil { + if err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil { // Failing to update dispatcher pods annotation leads to config map refresh delayed by several seconds. // Since the dispatcher side is the consumer side, we don't lose availability, and we can consider the Trigger // ready. So, log out the error and move on to the next step. @@ -289,7 +289,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, trigger *eventing.Trigger logger.Debug("Updated data plane config map", zap.String("configmap", r.Env.DataPlaneConfigMapAsString())) // Update volume generation annotation of dispatcher pods - if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil { + if err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil { // Failing to update dispatcher pods annotation leads to config map refresh delayed by several seconds. // The delete trigger will eventually be seen by the data plane pods, so log out the error and move on to the // next step. diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/file/FileWatcher.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/file/FileWatcher.java index 9e3ee1d29a..ca2a91124f 100644 --- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/file/FileWatcher.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/file/FileWatcher.java @@ -55,7 +55,7 @@ public class FileWatcher implements AutoCloseable { /** * All args constructor. * - * @param contractConsumer updates receiver. + * @param triggerFunction is triggered whenever there is a file change. * @param file file to watch */ public FileWatcher(File file, Runnable triggerFunction) { diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/OIDCDiscoveryConfigListener.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/OIDCDiscoveryConfigListener.java new file mode 100644 index 0000000000..7f0bb717df --- /dev/null +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/OIDCDiscoveryConfigListener.java @@ -0,0 +1,107 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.core.oidc; + +import dev.knative.eventing.kafka.broker.core.features.FeaturesConfig; +import dev.knative.eventing.kafka.broker.core.file.FileWatcher; +import io.vertx.core.Vertx; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OIDCDiscoveryConfigListener implements AutoCloseable { + private static final Logger logger = LoggerFactory.getLogger(OIDCDiscoveryConfigListener.class); + private final String featuresConfigPath; + private final Vertx vertx; + private final FileWatcher configFeaturesWatcher; + private final int timeoutSeconds; + private List> callbacks; + private OIDCDiscoveryConfig oidcDiscoveryConfig; + + public OIDCDiscoveryConfigListener(String featuresConfigPath, Vertx vertx, int timeoutSeconds) throws IOException { + this.featuresConfigPath = featuresConfigPath; + this.vertx = vertx; + this.timeoutSeconds = timeoutSeconds; + + this.buildFeaturesAndOIDCDiscoveryConfig(); + + this.configFeaturesWatcher = + new FileWatcher(new File(featuresConfigPath + "/" + FeaturesConfig.KEY_AUTHENTICATION_OIDC), () -> { + if (this.oidcDiscoveryConfig == null) { + this.buildFeaturesAndOIDCDiscoveryConfig(); + if (this.oidcDiscoveryConfig != null && this.callbacks != null) { + this.callbacks.stream() + .filter(Objects::nonNull) + .forEach(c -> c.accept(this.oidcDiscoveryConfig)); + } + } + }); + + this.configFeaturesWatcher.start(); + } + + public OIDCDiscoveryConfig getOidcDiscoveryConfig() { + return oidcDiscoveryConfig; + } + + public int registerCallback(Consumer callback) { + if (this.callbacks == null) { + this.callbacks = new ArrayList<>(); + } + + this.callbacks.add(callback); + return this.callbacks.size() - 1; + } + + public void deregisterCallback(int callbackId) { + this.callbacks.set(callbackId, null); + } + + private void buildOIDCDiscoveryConfig() throws ExecutionException, InterruptedException, TimeoutException { + this.oidcDiscoveryConfig = OIDCDiscoveryConfig.build(this.vertx) + .toCompletionStage() + .toCompletableFuture() + .get(this.timeoutSeconds, TimeUnit.SECONDS); + } + + private void buildFeaturesAndOIDCDiscoveryConfig() { + try { + FeaturesConfig featuresConfig = new FeaturesConfig(featuresConfigPath); + if (featuresConfig.isAuthenticationOIDC()) { + try { + this.buildOIDCDiscoveryConfig(); + } catch (ExecutionException | InterruptedException | TimeoutException e) { + logger.error("Unable to build OIDC Discover Config even though OIDC authentication is enabled", e); + } + } + } catch (IOException e) { + logger.warn("failed to get feature config, skipping building OIDC Discovery Config", e); + } + } + + @Override + public void close() throws Exception { + this.configFeaturesWatcher.close(); + } +} diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java index 468da5784c..f8e268dbb5 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java @@ -23,6 +23,7 @@ import dev.knative.eventing.kafka.broker.core.file.FileWatcher; import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig; +import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfigListener; import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier; import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifierImpl; import dev.knative.eventing.kafka.broker.core.reconciler.IngressReconcilerListener; @@ -34,7 +35,6 @@ import dev.knative.eventing.kafka.broker.receiver.impl.handler.MethodNotAllowedHandler; import dev.knative.eventing.kafka.broker.receiver.impl.handler.ProbeHandler; import dev.knative.eventing.kafka.broker.receiver.main.ReceiverEnv; -import io.fabric8.kubernetes.client.*; import io.vertx.core.*; import io.vertx.core.buffer.*; import io.vertx.core.eventbus.MessageConsumer; @@ -88,7 +88,8 @@ public class ReceiverVerticle extends AbstractVerticle implements Handler ingressProducerStoreFactory, final IngressRequestHandler ingressRequestHandler, final String secretVolumePath, - final OIDCDiscoveryConfig oidcDiscoveryConfig) { + final OIDCDiscoveryConfigListener oidcDiscoveryConfigListener) { Objects.requireNonNull(env); Objects.requireNonNull(httpServerOptions); @@ -121,7 +122,7 @@ public ReceiverVerticle( this.secretVolume = new File(secretVolumePath); this.tlsKeyFile = new File(secretVolumePath + "/tls.key"); this.tlsCrtFile = new File(secretVolumePath + "/tls.crt"); - this.oidcDiscoveryConfig = oidcDiscoveryConfig; + this.oidcDiscoveryConfigListener = oidcDiscoveryConfigListener; } public HttpServerOptions getHttpsServerOptions() { @@ -135,8 +136,13 @@ public void start(final Promise startPromise) { .watchIngress(IngressReconcilerListener.all(this.ingressProducerStore, this.ingressRequestHandler)) .buildAndListen(vertx); - TokenVerifier tokenVerifier = new TokenVerifierImpl(vertx, oidcDiscoveryConfig); - this.authenticationHandler = new AuthenticationHandler(tokenVerifier); + // the oidc discovery config is set initially when the listener is started, so we initialize the + // auth handler here, ensuring it is never null + this.buildAuthHandler(oidcDiscoveryConfigListener.getOidcDiscoveryConfig()); + + // the oidc config listener runs the callback whenever the config file is updated + // so, we can be sure that the auth handler always has the up to date config + this.oidcDiscoveryCallbackId = this.oidcDiscoveryConfigListener.registerCallback(this::buildAuthHandler); this.httpServer = vertx.createHttpServer(this.httpServerOptions); @@ -181,6 +187,11 @@ public void start(final Promise startPromise) { setupSecretWatcher(); } + private void buildAuthHandler(OIDCDiscoveryConfig config) { + TokenVerifier tokenVerifier = new TokenVerifierImpl(vertx, config); + this.authenticationHandler = new AuthenticationHandler(tokenVerifier); + } + // Set up the secret watcher private void setupSecretWatcher() { try { @@ -200,6 +211,8 @@ public void stop(Promise stopPromise) throws Exception { .mapEmpty() .onComplete(stopPromise); + this.oidcDiscoveryConfigListener.deregisterCallback(this.oidcDiscoveryCallbackId); + // close the watcher if (this.secretWatcher != null) { try { diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java index c2bc3b0772..f39c499385 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java @@ -22,10 +22,9 @@ import dev.knative.eventing.kafka.broker.core.eventbus.ContractPublisher; import dev.knative.eventing.kafka.broker.core.eventtype.EventType; import dev.knative.eventing.kafka.broker.core.eventtype.EventTypeListerFactory; -import dev.knative.eventing.kafka.broker.core.features.FeaturesConfig; import dev.knative.eventing.kafka.broker.core.file.FileWatcher; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; -import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig; +import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfigListener; import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler; import dev.knative.eventing.kafka.broker.core.tracing.TracingConfig; import dev.knative.eventing.kafka.broker.core.utils.Configurations; @@ -75,8 +74,6 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk OpenTelemetrySdk openTelemetry = TracingConfig.fromDir(env.getConfigTracingPath()).setup(); - FeaturesConfig featuresConfig = new FeaturesConfig(env.getConfigFeaturesPath()); - // Read producer properties and override some defaults Properties producerConfigs = Configurations.readPropertiesSync(env.getProducerConfigFilePath()); producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); @@ -108,29 +105,12 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk httpsServerOptions.setPort(env.getIngressTLSPort()); httpsServerOptions.setTracingPolicy(TracingPolicy.PROPAGATE); - // Setup OIDC discovery config - OIDCDiscoveryConfig oidcDiscoveryConfig = null; - try { - oidcDiscoveryConfig = OIDCDiscoveryConfig.build(vertx) - .toCompletionStage() - .toCompletableFuture() - .get(); - } catch (Exception ex) { - if (featuresConfig.isAuthenticationOIDC()) { - logger.error("Could not load OIDC config while OIDC authentication feature is enabled."); - throw ex; - } else { - logger.warn( - "Could not load OIDC configuration. This will lead to problems, when the {} flag will be enabled later", - FeaturesConfig.KEY_AUTHENTICATION_OIDC); - } - } - final var kubernetesClient = new KubernetesClientBuilder().build(); final var eventTypeClient = kubernetesClient.resources(EventType.class); final var eventTypeListerFactory = new EventTypeListerFactory(eventTypeClient); - // Configure the verticle to deploy and the deployment options + OIDCDiscoveryConfigListener oidcDiscoveryConfigListener = + new OIDCDiscoveryConfigListener(env.getConfigFeaturesPath(), vertx, env.getWaitStartupSeconds()); try { final Supplier receiverVerticleFactory = new ReceiverVerticleFactory( @@ -142,7 +122,7 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk kafkaProducerFactory, eventTypeClient, vertx, - oidcDiscoveryConfig, + oidcDiscoveryConfigListener, eventTypeListerFactory); DeploymentOptions deploymentOptions = new DeploymentOptions().setInstances(Runtime.getRuntime().availableProcessors()); @@ -161,7 +141,12 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk // Register shutdown hook for graceful shutdown. Shutdown.registerHook( - vertx, publisher, fileWatcher, openTelemetry.getSdkTracerProvider(), eventTypeListerFactory); + vertx, + publisher, + fileWatcher, + openTelemetry.getSdkTracerProvider(), + eventTypeListerFactory, + oidcDiscoveryConfigListener); } catch (final Exception ex) { logger.error("Failed to startup the receiver", ex); diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java index 61404f87b1..be195728cf 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java @@ -19,7 +19,7 @@ import dev.knative.eventing.kafka.broker.core.eventtype.EventType; import dev.knative.eventing.kafka.broker.core.eventtype.EventTypeCreatorImpl; import dev.knative.eventing.kafka.broker.core.eventtype.EventTypeListerFactory; -import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig; +import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfigListener; import dev.knative.eventing.kafka.broker.core.security.AuthProvider; import dev.knative.eventing.kafka.broker.receiver.IngressRequestHandler; import dev.knative.eventing.kafka.broker.receiver.impl.IngressProducerReconcilableStore; @@ -48,7 +48,7 @@ class ReceiverVerticleFactory implements Supplier { private final String secretVolumePath = "/etc/receiver-tls-secret"; private final IngressRequestHandler ingressRequestHandler; - private final OIDCDiscoveryConfig oidcDiscoveryConfig; + private final OIDCDiscoveryConfigListener oidcDiscoveryConfigListener; private final EventTypeListerFactory eventTypeListerFactory; private ReactiveProducerFactory kafkaProducerFactory; @@ -62,22 +62,20 @@ class ReceiverVerticleFactory implements Supplier { final ReactiveProducerFactory kafkaProducerFactory, final MixedOperation, Resource> eventTypeClient, Vertx vertx, - final OIDCDiscoveryConfig oidcDiscoveryConfig, + final OIDCDiscoveryConfigListener oidcDiscoveryConfigListener, final EventTypeListerFactory eventTypeListerFactory) throws NoSuchAlgorithmException { - { - this.env = env; - this.producerConfigs = producerConfigs; - this.httpServerOptions = httpServerOptions; - this.httpsServerOptions = httpsServerOptions; - this.ingressRequestHandler = new IngressRequestHandlerImpl( - StrictRequestToRecordMapper.getInstance(), - metricsRegistry, - new EventTypeCreatorImpl(eventTypeClient, vertx)); - this.kafkaProducerFactory = kafkaProducerFactory; - this.oidcDiscoveryConfig = oidcDiscoveryConfig; - this.eventTypeListerFactory = eventTypeListerFactory; - } + this.env = env; + this.producerConfigs = producerConfigs; + this.httpServerOptions = httpServerOptions; + this.httpsServerOptions = httpsServerOptions; + this.ingressRequestHandler = new IngressRequestHandlerImpl( + StrictRequestToRecordMapper.getInstance(), + metricsRegistry, + new EventTypeCreatorImpl(eventTypeClient, vertx)); + this.kafkaProducerFactory = kafkaProducerFactory; + this.oidcDiscoveryConfigListener = oidcDiscoveryConfigListener; + this.eventTypeListerFactory = eventTypeListerFactory; } @Override @@ -93,6 +91,6 @@ public Verticle get() { eventTypeListerFactory), this.ingressRequestHandler, secretVolumePath, - oidcDiscoveryConfig); + oidcDiscoveryConfigListener); } } diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java index c8f383b0a3..e74e2f4962 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java @@ -36,6 +36,7 @@ import dev.knative.eventing.kafka.broker.core.eventbus.ContractPublisher; import dev.knative.eventing.kafka.broker.core.eventtype.EventTypeListerFactory; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; +import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfigListener; import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler; import dev.knative.eventing.kafka.broker.core.security.AuthProvider; import dev.knative.eventing.kafka.broker.core.testing.CloudEventSerializerMock; @@ -153,7 +154,7 @@ public void setUpHTTP(final Vertx vertx, final VertxTestContext testContext) { new IngressRequestHandlerImpl( StrictRequestToRecordMapper.getInstance(), registry, ((event, lister, reference) -> null)), SECRET_VOLUME_PATH, - null); + mock(OIDCDiscoveryConfigListener.class)); vertx.deployVerticle(verticle, testContext.succeeding(ar -> testContext.completeNow())); // Connect to the logger in ReceiverVerticle diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTracingTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTracingTest.java index 6dcdfe9347..b191d3392e 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTracingTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTracingTest.java @@ -24,6 +24,7 @@ import dev.knative.eventing.kafka.broker.core.ReactiveKafkaProducer; import dev.knative.eventing.kafka.broker.core.eventtype.EventTypeListerFactory; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; +import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfigListener; import dev.knative.eventing.kafka.broker.core.security.AuthProvider; import dev.knative.eventing.kafka.broker.core.testing.CloudEventSerializerMock; import dev.knative.eventing.kafka.broker.receiver.impl.handler.IngressRequestHandlerImpl; @@ -139,7 +140,7 @@ public void setup() throws ExecutionException, InterruptedException { Metrics.getRegistry(), ((event, lister, reference) -> null)), SECRET_VOLUME_PATH, - null); + mock(OIDCDiscoveryConfigListener.class)); vertx.deployVerticle(verticle).toCompletionStage().toCompletableFuture().get(); } diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java index d42df7f245..fa28453b14 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java @@ -22,7 +22,7 @@ import dev.knative.eventing.kafka.broker.core.eventtype.EventType; import dev.knative.eventing.kafka.broker.core.eventtype.EventTypeListerFactory; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; -import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig; +import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfigListener; import dev.knative.eventing.kafka.broker.receiver.MockReactiveProducerFactory; import io.fabric8.kubernetes.client.KubernetesClient; import io.micrometer.core.instrument.MeterRegistry; @@ -55,7 +55,7 @@ public void shouldCreateMultipleReceiverVerticleInstances(Vertx vertx) throws No mock(MockReactiveProducerFactory.class), mockClient.resources(EventType.class), vertx, - mock(OIDCDiscoveryConfig.class), + mock(OIDCDiscoveryConfigListener.class), mock(EventTypeListerFactory.class)); assertThat(supplier.get()).isNotSameAs(supplier.get()); diff --git a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java index 5e069cff59..d18e042724 100644 --- a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java +++ b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java @@ -33,6 +33,7 @@ import dev.knative.eventing.kafka.broker.core.eventbus.ContractPublisher; import dev.knative.eventing.kafka.broker.core.eventtype.EventTypeListerFactory; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; +import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfigListener; import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler; import dev.knative.eventing.kafka.broker.core.security.AuthProvider; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.CloudEventDeserializer; @@ -399,7 +400,7 @@ private ReceiverVerticle setUpReceiver(final Vertx vertx, final VertxTestContext Metrics.getRegistry(), (((event, lister, reference) -> null))), SECRET_VOLUME_PATH, - null); + mock(OIDCDiscoveryConfigListener.class)); final CountDownLatch latch = new CountDownLatch(1); vertx.deployVerticle(verticle, context.succeeding(h -> latch.countDown()));