From 123e6c1a1b0eec7e4f704dcebd5560d5d97c62b8 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Wed, 31 Jul 2024 14:37:36 -0400 Subject: [PATCH] fix: do not re-deploy verticles Signed-off-by: Calum Murray --- .../oidc/OIDCDiscoveryConfigListener.java | 110 ++++++++++ .../receiver/impl/ReceiverVerticle.java | 20 +- .../kafka/broker/receiver/main/Main.java | 67 +++++- .../receiver/main/ReceiverDeployer.java | 191 ------------------ .../main/ReceiverVerticleFactory.java | 10 +- .../receiver/impl/ReceiverVerticleTest.java | 3 +- .../impl/ReceiverVerticleTracingTest.java | 3 +- .../main/ReceiverVerticleFactoryTest.java | 4 +- .../broker/tests/AbstractDataPlaneTest.java | 3 +- 9 files changed, 194 insertions(+), 217 deletions(-) create mode 100644 data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/OIDCDiscoveryConfigListener.java delete mode 100644 data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverDeployer.java diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/OIDCDiscoveryConfigListener.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/OIDCDiscoveryConfigListener.java new file mode 100644 index 0000000000..cb550fd368 --- /dev/null +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/OIDCDiscoveryConfigListener.java @@ -0,0 +1,110 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.core.oidc; + +import dev.knative.eventing.kafka.broker.core.features.FeaturesConfig; +import dev.knative.eventing.kafka.broker.core.file.FileWatcher; +import io.vertx.core.Vertx; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OIDCDiscoveryConfigListener implements AutoCloseable { + private static final Logger logger = LoggerFactory.getLogger(OIDCDiscoveryConfigListener.class); + private final String featuresConfigPath; + private final Vertx vertx; + private final FileWatcher configFeaturesWatcher; + private final int timeoutSeconds; + private List> callbacks; + private OIDCDiscoveryConfig oidcDiscoveryConfig; + + public OIDCDiscoveryConfigListener(String featuresConfigPath, Vertx vertx, int timeoutSeconds) throws IOException { + this.featuresConfigPath = featuresConfigPath; + this.vertx = vertx; + this.timeoutSeconds = timeoutSeconds; + + this.oidcDiscoveryConfig = this.buildFeaturesAndOIDCDiscoveryConfig(); + + this.configFeaturesWatcher = new FileWatcher(new File(featuresConfigPath), () -> { + if (this.oidcDiscoveryConfig == null) { + this.oidcDiscoveryConfig = this.buildFeaturesAndOIDCDiscoveryConfig(); + if (this.oidcDiscoveryConfig != null && this.callbacks != null) { + this.callbacks.stream().filter(Objects::nonNull).forEach(c -> c.accept(this.oidcDiscoveryConfig)); + } + } + }); + + this.configFeaturesWatcher.start(); + } + + public OIDCDiscoveryConfig getOidcDiscoveryConfig() { + return oidcDiscoveryConfig; + } + + public int registerCallback(Consumer callback) { + if (this.callbacks == null) { + this.callbacks = new ArrayList<>(); + } + + this.callbacks.add(callback); + + return this.callbacks.size(); + } + + public void deregisterCallback(int callbackId) { + this.callbacks.set(callbackId, null); + } + + private OIDCDiscoveryConfig buildOIDCDiscoveryConfig() + throws ExecutionException, InterruptedException, TimeoutException { + this.oidcDiscoveryConfig = OIDCDiscoveryConfig.build(this.vertx) + .toCompletionStage() + .toCompletableFuture() + .get(this.timeoutSeconds, TimeUnit.SECONDS); + + return this.oidcDiscoveryConfig; + } + + private OIDCDiscoveryConfig buildFeaturesAndOIDCDiscoveryConfig() { + try { + FeaturesConfig featuresConfig = new FeaturesConfig(featuresConfigPath); + if (featuresConfig.isAuthenticationOIDC()) { + try { + return this.buildOIDCDiscoveryConfig(); + } catch (ExecutionException | InterruptedException | TimeoutException e) { + logger.error("Unable to build OIDC Discover Config even though OIDC authentication is enabled", e); + } + } + } catch (IOException e) { + logger.warn("failed to get feature config, skipping building OIDC Discovery Config", e); + } + + return null; + } + + @Override + public void close() throws Exception { + this.configFeaturesWatcher.close(); + } +} diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java index 468da5784c..142a841231 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java @@ -23,6 +23,7 @@ import dev.knative.eventing.kafka.broker.core.file.FileWatcher; import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig; +import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfigListener; import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier; import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifierImpl; import dev.knative.eventing.kafka.broker.core.reconciler.IngressReconcilerListener; @@ -34,7 +35,6 @@ import dev.knative.eventing.kafka.broker.receiver.impl.handler.MethodNotAllowedHandler; import dev.knative.eventing.kafka.broker.receiver.impl.handler.ProbeHandler; import dev.knative.eventing.kafka.broker.receiver.main.ReceiverEnv; -import io.fabric8.kubernetes.client.*; import io.vertx.core.*; import io.vertx.core.buffer.*; import io.vertx.core.eventbus.MessageConsumer; @@ -88,7 +88,8 @@ public class ReceiverVerticle extends AbstractVerticle implements Handler ingressProducerStoreFactory, final IngressRequestHandler ingressRequestHandler, final String secretVolumePath, - final OIDCDiscoveryConfig oidcDiscoveryConfig) { + final OIDCDiscoveryConfigListener oidcDiscoveryConfigListener) { Objects.requireNonNull(env); Objects.requireNonNull(httpServerOptions); @@ -121,7 +122,7 @@ public ReceiverVerticle( this.secretVolume = new File(secretVolumePath); this.tlsKeyFile = new File(secretVolumePath + "/tls.key"); this.tlsCrtFile = new File(secretVolumePath + "/tls.crt"); - this.oidcDiscoveryConfig = oidcDiscoveryConfig; + this.oidcDiscoveryConfigListener = oidcDiscoveryConfigListener; } public HttpServerOptions getHttpsServerOptions() { @@ -135,8 +136,8 @@ public void start(final Promise startPromise) { .watchIngress(IngressReconcilerListener.all(this.ingressProducerStore, this.ingressRequestHandler)) .buildAndListen(vertx); - TokenVerifier tokenVerifier = new TokenVerifierImpl(vertx, oidcDiscoveryConfig); - this.authenticationHandler = new AuthenticationHandler(tokenVerifier); + this.buildAuthHandler(oidcDiscoveryConfigListener.getOidcDiscoveryConfig()); + this.oidcDiscoveryCallbackId = this.oidcDiscoveryConfigListener.registerCallback(this::buildAuthHandler); this.httpServer = vertx.createHttpServer(this.httpServerOptions); @@ -181,6 +182,11 @@ public void start(final Promise startPromise) { setupSecretWatcher(); } + private void buildAuthHandler(OIDCDiscoveryConfig config) { + TokenVerifier tokenVerifier = new TokenVerifierImpl(vertx, config); + this.authenticationHandler = new AuthenticationHandler(tokenVerifier); + } + // Set up the secret watcher private void setupSecretWatcher() { try { @@ -200,6 +206,8 @@ public void stop(Promise stopPromise) throws Exception { .mapEmpty() .onComplete(stopPromise); + this.oidcDiscoveryConfigListener.deregisterCallback(this.oidcDiscoveryCallbackId); + // close the watcher if (this.secretWatcher != null) { try { diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java index 11f688ed27..fc78b5f420 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java @@ -19,26 +19,38 @@ import dev.knative.eventing.kafka.broker.core.ReactiveProducerFactory; import dev.knative.eventing.kafka.broker.core.eventbus.ContractMessageCodec; +import dev.knative.eventing.kafka.broker.core.eventbus.ContractPublisher; import dev.knative.eventing.kafka.broker.core.eventtype.EventType; +import dev.knative.eventing.kafka.broker.core.features.FeaturesConfig; +import dev.knative.eventing.kafka.broker.core.file.FileWatcher; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; +import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfigListener; +import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler; import dev.knative.eventing.kafka.broker.core.tracing.TracingConfig; import dev.knative.eventing.kafka.broker.core.utils.Configurations; +import dev.knative.eventing.kafka.broker.core.utils.Shutdown; import io.cloudevents.kafka.CloudEventSerializer; import io.cloudevents.kafka.PartitionKeyExtensionInterceptor; import io.fabric8.kubernetes.client.KubernetesClientBuilder; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.SharedInformerFactory; import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.vertx.core.DeploymentOptions; +import io.vertx.core.Verticle; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import io.vertx.core.http.HttpServerOptions; import io.vertx.core.tracing.TracingPolicy; import io.vertx.tracing.opentelemetry.OpenTelemetryOptions; +import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; @@ -115,17 +127,52 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk informerException); } - final ReceiverDeployer receiverDeployer = new ReceiverDeployer( - kafkaProducerFactory, - env, - producerConfigs, + OIDCDiscoveryConfigListener oidcDiscoveryConfigListener = new OIDCDiscoveryConfigListener( + env.getConfigFeaturesPath() + "/" + FeaturesConfig.KEY_AUTHENTICATION_OIDC, vertx, - httpServerOptions, - httpsServerOptions, - eventTypeClient, - eventTypeInformer, - openTelemetry); + env.getWaitStartupSeconds()); - receiverDeployer.run(); + try { + final Supplier receiverVerticleFactory = new ReceiverVerticleFactory( + env, + producerConfigs, + Metrics.getRegistry(), + httpServerOptions, + httpsServerOptions, + kafkaProducerFactory, + eventTypeClient, + eventTypeInformer, + vertx, + oidcDiscoveryConfigListener); + DeploymentOptions deploymentOptions = + new DeploymentOptions().setInstances(Runtime.getRuntime().availableProcessors()); + // Deploy the receiver verticles + vertx.deployVerticle(receiverVerticleFactory, deploymentOptions) + .toCompletionStage() + .toCompletableFuture() + .get(env.getWaitStartupSeconds(), TimeUnit.SECONDS); + logger.info("Receiver started"); + + ContractPublisher publisher = + new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS); + File file = new File(env.getDataPlaneConfigFilePath()); + FileWatcher fileWatcher = new FileWatcher(file, () -> publisher.updateContract(file)); + fileWatcher.start(); + + var closeables = new ArrayList<>(Arrays.asList( + publisher, fileWatcher, openTelemetry.getSdkTracerProvider(), oidcDiscoveryConfigListener)); + + if (eventTypeInformer != null) { + closeables.add(eventTypeInformer); + } + + // Register shutdown hook for graceful shutdown. + Shutdown.registerHook(vertx, closeables.toArray(new AutoCloseable[0])); + + } catch (final Exception ex) { + logger.error("Failed to startup the receiver", ex); + Shutdown.closeVertxSync(vertx); + System.exit(1); + } } } diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverDeployer.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverDeployer.java deleted file mode 100644 index 4c7ae9d6a4..0000000000 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverDeployer.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dev.knative.eventing.kafka.broker.receiver.main; - -import dev.knative.eventing.kafka.broker.core.ReactiveProducerFactory; -import dev.knative.eventing.kafka.broker.core.eventbus.ContractPublisher; -import dev.knative.eventing.kafka.broker.core.eventtype.EventType; -import dev.knative.eventing.kafka.broker.core.features.FeaturesConfig; -import dev.knative.eventing.kafka.broker.core.file.FileWatcher; -import dev.knative.eventing.kafka.broker.core.metrics.Metrics; -import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig; -import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler; -import dev.knative.eventing.kafka.broker.core.utils.Shutdown; -import io.fabric8.kubernetes.api.model.KubernetesResourceList; -import io.fabric8.kubernetes.client.dsl.MixedOperation; -import io.fabric8.kubernetes.client.dsl.Resource; -import io.fabric8.kubernetes.client.informers.SharedIndexInformer; -import io.opentelemetry.sdk.OpenTelemetrySdk; -import io.vertx.core.DeploymentOptions; -import io.vertx.core.Verticle; -import io.vertx.core.Vertx; -import io.vertx.core.http.HttpServerOptions; -import java.io.File; -import java.io.IOException; -import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.function.Supplier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ReceiverDeployer { - - private final ReactiveProducerFactory kafkaProducerFactory; - private final ReceiverEnv env; - private final Properties producerConfigs; - private final Vertx vertx; - private final HttpServerOptions httpServerOptions; - private final HttpServerOptions httpsServerOptions; - private final MixedOperation, Resource> eventTypeClient; - private final SharedIndexInformer eventTypeInformer; - private FeaturesConfig featuresConfig; - private OIDCDiscoveryConfig oidcDiscoveryConfig; - private final OpenTelemetrySdk openTelemetry; - private String deploymentId; - private FileWatcher featuresConfigWatcher; - - private static final Logger logger = LoggerFactory.getLogger(ReceiverDeployer.class); - - public ReceiverDeployer( - final ReactiveProducerFactory kafkaProducerFactory, - final ReceiverEnv env, - final Properties producerConfigs, - final Vertx vertx, - final HttpServerOptions httpServerOptions, - final HttpServerOptions httpsServerOptions, - final MixedOperation, Resource> eventTypeClient, - final SharedIndexInformer eventTypeInformer, - OpenTelemetrySdk openTelemetry) - throws IOException, ExecutionException, InterruptedException { - this.kafkaProducerFactory = kafkaProducerFactory; - this.env = env; - this.producerConfigs = producerConfigs; - this.vertx = vertx; - this.httpServerOptions = httpServerOptions; - this.httpsServerOptions = httpsServerOptions; - this.eventTypeClient = eventTypeClient; - this.eventTypeInformer = eventTypeInformer; - this.featuresConfig = new FeaturesConfig(env.getConfigFeaturesPath()); - this.openTelemetry = openTelemetry; - this.featuresConfigWatcher = new FileWatcher( - new File(env.getConfigFeaturesPath() + "/" + FeaturesConfig.KEY_AUTHENTICATION_OIDC), () -> { - logger.info("config features updated, reloading"); - try { - this.featuresConfig = new FeaturesConfig(env.getConfigFeaturesPath()); - if (this.featuresConfig.isAuthenticationOIDC() && this.oidcDiscoveryConfig == null) { - logger.info("building OIDC config"); - this.buildOIDCDiscoveryConfig(); - logger.info("undeploying verticles"); - this.undeploy(); - logger.info("deploying verticles again with new oidc config"); - this.deploy(); - } - } catch (IOException - | ExecutionException - | InterruptedException - | NoSuchAlgorithmException - | TimeoutException e) { - logger.warn("features config file updated but was unable to rebuild the config", e); - } - }); - if (this.featuresConfig.isAuthenticationOIDC()) { - this.buildOIDCDiscoveryConfig(); - } - } - - public void buildOIDCDiscoveryConfig() throws ExecutionException, InterruptedException { - try { - this.oidcDiscoveryConfig = OIDCDiscoveryConfig.build(this.vertx) - .toCompletionStage() - .toCompletableFuture() - .get(env.getWaitStartupSeconds(), TimeUnit.SECONDS); - } catch (Exception ex) { - logger.error("Could not load OIDC config with OIDC authentication feature is enabled.", ex); - Shutdown.closeVertxSync(vertx); - System.exit(1); - } - } - - public void run() { - try { - this.featuresConfigWatcher.start(); - } catch (IOException ex) { - logger.warn("failed to start features config watcher"); - } - - try { - // deploy the receiver verticles - this.deploymentId = this.deploy(); - logger.info("Receiver started"); - - // set up contract publisher - ContractPublisher publisher = - new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS); - File file = new File(env.getDataPlaneConfigFilePath()); - FileWatcher fileWatcher = new FileWatcher(file, () -> publisher.updateContract(file)); - fileWatcher.start(); - - var closeables = - new ArrayList<>(Arrays.asList(publisher, fileWatcher, openTelemetry.getSdkTracerProvider())); - - if (eventTypeInformer != null) { - closeables.add(eventTypeInformer); - } - - // Register shutdown hook for graceful shutdown. - Shutdown.registerHook(vertx, closeables.toArray(new AutoCloseable[0])); - } catch (final Exception ex) { - logger.error("Failed to startup the receiver", ex); - Shutdown.closeVertxSync(vertx); - System.exit(1); - } - } - - private String deploy() - throws ExecutionException, InterruptedException, TimeoutException, NoSuchAlgorithmException { - final Supplier receiverVerticleFactory = new ReceiverVerticleFactory( - env, - producerConfigs, - Metrics.getRegistry(), - httpServerOptions, - httpsServerOptions, - kafkaProducerFactory, - eventTypeClient, - eventTypeInformer, - vertx, - oidcDiscoveryConfig); - DeploymentOptions deploymentOptions = - new DeploymentOptions().setInstances(Runtime.getRuntime().availableProcessors()); - // Deploy the receiver verticles - return vertx.deployVerticle(receiverVerticleFactory, deploymentOptions) - .toCompletionStage() - .toCompletableFuture() - .get(env.getWaitStartupSeconds(), TimeUnit.SECONDS); - } - - private void undeploy() { - if (this.deploymentId == null || this.deploymentId.isEmpty()) { - return; - } - - this.vertx.undeploy(this.deploymentId); - } -} diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java index 9cb8b26518..2867d39f01 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java @@ -19,7 +19,7 @@ import dev.knative.eventing.kafka.broker.core.eventtype.EventType; import dev.knative.eventing.kafka.broker.core.eventtype.EventTypeCreatorImpl; import dev.knative.eventing.kafka.broker.core.eventtype.EventTypeListerFactory; -import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig; +import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfigListener; import dev.knative.eventing.kafka.broker.core.security.AuthProvider; import dev.knative.eventing.kafka.broker.receiver.IngressRequestHandler; import dev.knative.eventing.kafka.broker.receiver.impl.IngressProducerReconcilableStore; @@ -49,7 +49,7 @@ class ReceiverVerticleFactory implements Supplier { private final String secretVolumePath = "/etc/receiver-tls-secret"; private final IngressRequestHandler ingressRequestHandler; - private final OIDCDiscoveryConfig oidcDiscoveryConfig; + private final OIDCDiscoveryConfigListener oidcDiscoveryConfigListener; private ReactiveProducerFactory kafkaProducerFactory; @@ -63,7 +63,7 @@ class ReceiverVerticleFactory implements Supplier { final MixedOperation, Resource> eventTypeClient, final SharedIndexInformer eventTypeInformer, Vertx vertx, - final OIDCDiscoveryConfig oidcDiscoveryConfig) + final OIDCDiscoveryConfigListener oidcDiscoveryConfigListener) throws NoSuchAlgorithmException { { this.env = env; @@ -75,7 +75,7 @@ class ReceiverVerticleFactory implements Supplier { metricsRegistry, new EventTypeCreatorImpl(eventTypeClient, new EventTypeListerFactory(eventTypeInformer), vertx)); this.kafkaProducerFactory = kafkaProducerFactory; - this.oidcDiscoveryConfig = oidcDiscoveryConfig; + this.oidcDiscoveryConfigListener = oidcDiscoveryConfigListener; } } @@ -91,6 +91,6 @@ public Verticle get() { properties -> kafkaProducerFactory.create(v, properties)), this.ingressRequestHandler, secretVolumePath, - oidcDiscoveryConfig); + oidcDiscoveryConfigListener); } } diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java index 480f3eeda5..c290f26855 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java @@ -35,6 +35,7 @@ import dev.knative.eventing.kafka.broker.core.eventbus.ContractMessageCodec; import dev.knative.eventing.kafka.broker.core.eventbus.ContractPublisher; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; +import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfigListener; import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler; import dev.knative.eventing.kafka.broker.core.security.AuthProvider; import dev.knative.eventing.kafka.broker.core.testing.CloudEventSerializerMock; @@ -151,7 +152,7 @@ public void setUpHTTP(final Vertx vertx, final VertxTestContext testContext) { new IngressRequestHandlerImpl( StrictRequestToRecordMapper.getInstance(), registry, ((event, reference) -> null)), SECRET_VOLUME_PATH, - null); + mock(OIDCDiscoveryConfigListener.class)); vertx.deployVerticle(verticle, testContext.succeeding(ar -> testContext.completeNow())); // Connect to the logger in ReceiverVerticle diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTracingTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTracingTest.java index db46ddc3c1..eea98a4e24 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTracingTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTracingTest.java @@ -23,6 +23,7 @@ import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; import dev.knative.eventing.kafka.broker.core.ReactiveKafkaProducer; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; +import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfigListener; import dev.knative.eventing.kafka.broker.core.security.AuthProvider; import dev.knative.eventing.kafka.broker.core.testing.CloudEventSerializerMock; import dev.knative.eventing.kafka.broker.receiver.impl.handler.IngressRequestHandlerImpl; @@ -133,7 +134,7 @@ public void setup() throws ExecutionException, InterruptedException { new IngressRequestHandlerImpl( StrictRequestToRecordMapper.getInstance(), Metrics.getRegistry(), ((event, reference) -> null)), SECRET_VOLUME_PATH, - null); + mock(OIDCDiscoveryConfigListener.class)); vertx.deployVerticle(verticle).toCompletionStage().toCompletableFuture().get(); } diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java index 6c6fb276a6..0f11735887 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java @@ -21,7 +21,7 @@ import dev.knative.eventing.kafka.broker.core.eventtype.EventType; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; -import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig; +import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfigListener; import dev.knative.eventing.kafka.broker.receiver.MockReactiveProducerFactory; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; @@ -56,7 +56,7 @@ public void shouldCreateMultipleReceiverVerticleInstances(Vertx vertx) throws No mockClient.resources(EventType.class), mock(SharedIndexInformer.class), vertx, - mock(OIDCDiscoveryConfig.class)); + mock(OIDCDiscoveryConfigListener.class)); assertThat(supplier.get()).isNotSameAs(supplier.get()); } diff --git a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java index dd92bb3bd7..698a9c663d 100644 --- a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java +++ b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java @@ -32,6 +32,7 @@ import dev.knative.eventing.kafka.broker.core.eventbus.ContractMessageCodec; import dev.knative.eventing.kafka.broker.core.eventbus.ContractPublisher; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; +import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfigListener; import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler; import dev.knative.eventing.kafka.broker.core.security.AuthProvider; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.CloudEventDeserializer; @@ -396,7 +397,7 @@ private ReceiverVerticle setUpReceiver(final Vertx vertx, final VertxTestContext Metrics.getRegistry(), (((event, reference) -> null))), SECRET_VOLUME_PATH, - null); + mock(OIDCDiscoveryConfigListener.class)); final CountDownLatch latch = new CountDownLatch(1); vertx.deployVerticle(verticle, context.succeeding(h -> latch.countDown()));