Skip to content

Commit

Permalink
fix: do not re-deploy verticles
Browse files Browse the repository at this point in the history
Signed-off-by: Calum Murray <[email protected]>
  • Loading branch information
Cali0707 committed Jul 31, 2024
1 parent b107836 commit 123e6c1
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 217 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright © 2018 Knative Authors ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.core.oidc;

import dev.knative.eventing.kafka.broker.core.features.FeaturesConfig;
import dev.knative.eventing.kafka.broker.core.file.FileWatcher;
import io.vertx.core.Vertx;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OIDCDiscoveryConfigListener implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(OIDCDiscoveryConfigListener.class);
private final String featuresConfigPath;
private final Vertx vertx;
private final FileWatcher configFeaturesWatcher;
private final int timeoutSeconds;
private List<Consumer<OIDCDiscoveryConfig>> callbacks;
private OIDCDiscoveryConfig oidcDiscoveryConfig;

public OIDCDiscoveryConfigListener(String featuresConfigPath, Vertx vertx, int timeoutSeconds) throws IOException {
this.featuresConfigPath = featuresConfigPath;
this.vertx = vertx;
this.timeoutSeconds = timeoutSeconds;

this.oidcDiscoveryConfig = this.buildFeaturesAndOIDCDiscoveryConfig();

this.configFeaturesWatcher = new FileWatcher(new File(featuresConfigPath), () -> {
if (this.oidcDiscoveryConfig == null) {
this.oidcDiscoveryConfig = this.buildFeaturesAndOIDCDiscoveryConfig();
if (this.oidcDiscoveryConfig != null && this.callbacks != null) {
this.callbacks.stream().filter(Objects::nonNull).forEach(c -> c.accept(this.oidcDiscoveryConfig));
}
}
});

this.configFeaturesWatcher.start();
}

public OIDCDiscoveryConfig getOidcDiscoveryConfig() {
return oidcDiscoveryConfig;
}

public int registerCallback(Consumer<OIDCDiscoveryConfig> callback) {
if (this.callbacks == null) {
this.callbacks = new ArrayList<>();
}

this.callbacks.add(callback);

return this.callbacks.size();
}

public void deregisterCallback(int callbackId) {
this.callbacks.set(callbackId, null);
}

private OIDCDiscoveryConfig buildOIDCDiscoveryConfig()
throws ExecutionException, InterruptedException, TimeoutException {
this.oidcDiscoveryConfig = OIDCDiscoveryConfig.build(this.vertx)
.toCompletionStage()
.toCompletableFuture()
.get(this.timeoutSeconds, TimeUnit.SECONDS);

return this.oidcDiscoveryConfig;
}

private OIDCDiscoveryConfig buildFeaturesAndOIDCDiscoveryConfig() {
try {
FeaturesConfig featuresConfig = new FeaturesConfig(featuresConfigPath);
if (featuresConfig.isAuthenticationOIDC()) {
try {
return this.buildOIDCDiscoveryConfig();
} catch (ExecutionException | InterruptedException | TimeoutException e) {
logger.error("Unable to build OIDC Discover Config even though OIDC authentication is enabled", e);
}
}
} catch (IOException e) {
logger.warn("failed to get feature config, skipping building OIDC Discovery Config", e);
}

return null;
}

@Override
public void close() throws Exception {
this.configFeaturesWatcher.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import dev.knative.eventing.kafka.broker.core.file.FileWatcher;
import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig;
import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfigListener;
import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier;
import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifierImpl;
import dev.knative.eventing.kafka.broker.core.reconciler.IngressReconcilerListener;
Expand All @@ -34,7 +35,6 @@
import dev.knative.eventing.kafka.broker.receiver.impl.handler.MethodNotAllowedHandler;
import dev.knative.eventing.kafka.broker.receiver.impl.handler.ProbeHandler;
import dev.knative.eventing.kafka.broker.receiver.main.ReceiverEnv;
import io.fabric8.kubernetes.client.*;
import io.vertx.core.*;
import io.vertx.core.buffer.*;
import io.vertx.core.eventbus.MessageConsumer;
Expand Down Expand Up @@ -88,7 +88,8 @@ public class ReceiverVerticle extends AbstractVerticle implements Handler<HttpSe

private final IngressRequestHandler ingressRequestHandler;
private final ReceiverEnv env;
private final OIDCDiscoveryConfig oidcDiscoveryConfig;
private final OIDCDiscoveryConfigListener oidcDiscoveryConfigListener;
private int oidcDiscoveryCallbackId;

private AuthenticationHandler authenticationHandler;
private HttpServer httpServer;
Expand All @@ -104,7 +105,7 @@ public ReceiverVerticle(
final Function<Vertx, IngressProducerReconcilableStore> ingressProducerStoreFactory,
final IngressRequestHandler ingressRequestHandler,
final String secretVolumePath,
final OIDCDiscoveryConfig oidcDiscoveryConfig) {
final OIDCDiscoveryConfigListener oidcDiscoveryConfigListener) {

Objects.requireNonNull(env);
Objects.requireNonNull(httpServerOptions);
Expand All @@ -121,7 +122,7 @@ public ReceiverVerticle(
this.secretVolume = new File(secretVolumePath);
this.tlsKeyFile = new File(secretVolumePath + "/tls.key");
this.tlsCrtFile = new File(secretVolumePath + "/tls.crt");
this.oidcDiscoveryConfig = oidcDiscoveryConfig;
this.oidcDiscoveryConfigListener = oidcDiscoveryConfigListener;
}

public HttpServerOptions getHttpsServerOptions() {
Expand All @@ -135,8 +136,8 @@ public void start(final Promise<Void> startPromise) {
.watchIngress(IngressReconcilerListener.all(this.ingressProducerStore, this.ingressRequestHandler))
.buildAndListen(vertx);

TokenVerifier tokenVerifier = new TokenVerifierImpl(vertx, oidcDiscoveryConfig);
this.authenticationHandler = new AuthenticationHandler(tokenVerifier);
this.buildAuthHandler(oidcDiscoveryConfigListener.getOidcDiscoveryConfig());
this.oidcDiscoveryCallbackId = this.oidcDiscoveryConfigListener.registerCallback(this::buildAuthHandler);

this.httpServer = vertx.createHttpServer(this.httpServerOptions);

Expand Down Expand Up @@ -181,6 +182,11 @@ public void start(final Promise<Void> startPromise) {
setupSecretWatcher();
}

private void buildAuthHandler(OIDCDiscoveryConfig config) {
TokenVerifier tokenVerifier = new TokenVerifierImpl(vertx, config);
this.authenticationHandler = new AuthenticationHandler(tokenVerifier);
}

// Set up the secret watcher
private void setupSecretWatcher() {
try {
Expand All @@ -200,6 +206,8 @@ public void stop(Promise<Void> stopPromise) throws Exception {
.<Void>mapEmpty()
.onComplete(stopPromise);

this.oidcDiscoveryConfigListener.deregisterCallback(this.oidcDiscoveryCallbackId);

// close the watcher
if (this.secretWatcher != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,38 @@

import dev.knative.eventing.kafka.broker.core.ReactiveProducerFactory;
import dev.knative.eventing.kafka.broker.core.eventbus.ContractMessageCodec;
import dev.knative.eventing.kafka.broker.core.eventbus.ContractPublisher;
import dev.knative.eventing.kafka.broker.core.eventtype.EventType;
import dev.knative.eventing.kafka.broker.core.features.FeaturesConfig;
import dev.knative.eventing.kafka.broker.core.file.FileWatcher;
import dev.knative.eventing.kafka.broker.core.metrics.Metrics;
import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfigListener;
import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler;
import dev.knative.eventing.kafka.broker.core.tracing.TracingConfig;
import dev.knative.eventing.kafka.broker.core.utils.Configurations;
import dev.knative.eventing.kafka.broker.core.utils.Shutdown;
import io.cloudevents.kafka.CloudEventSerializer;
import io.cloudevents.kafka.PartitionKeyExtensionInterceptor;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.tracing.TracingPolicy;
import io.vertx.tracing.opentelemetry.OpenTelemetryOptions;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
Expand Down Expand Up @@ -115,17 +127,52 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk
informerException);
}

final ReceiverDeployer receiverDeployer = new ReceiverDeployer(
kafkaProducerFactory,
env,
producerConfigs,
OIDCDiscoveryConfigListener oidcDiscoveryConfigListener = new OIDCDiscoveryConfigListener(
env.getConfigFeaturesPath() + "/" + FeaturesConfig.KEY_AUTHENTICATION_OIDC,
vertx,
httpServerOptions,
httpsServerOptions,
eventTypeClient,
eventTypeInformer,
openTelemetry);
env.getWaitStartupSeconds());

receiverDeployer.run();
try {
final Supplier<Verticle> receiverVerticleFactory = new ReceiverVerticleFactory(
env,
producerConfigs,
Metrics.getRegistry(),
httpServerOptions,
httpsServerOptions,
kafkaProducerFactory,
eventTypeClient,
eventTypeInformer,
vertx,
oidcDiscoveryConfigListener);
DeploymentOptions deploymentOptions =
new DeploymentOptions().setInstances(Runtime.getRuntime().availableProcessors());
// Deploy the receiver verticles
vertx.deployVerticle(receiverVerticleFactory, deploymentOptions)
.toCompletionStage()
.toCompletableFuture()
.get(env.getWaitStartupSeconds(), TimeUnit.SECONDS);
logger.info("Receiver started");

ContractPublisher publisher =
new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS);
File file = new File(env.getDataPlaneConfigFilePath());
FileWatcher fileWatcher = new FileWatcher(file, () -> publisher.updateContract(file));
fileWatcher.start();

var closeables = new ArrayList<>(Arrays.asList(
publisher, fileWatcher, openTelemetry.getSdkTracerProvider(), oidcDiscoveryConfigListener));

if (eventTypeInformer != null) {
closeables.add(eventTypeInformer);
}

// Register shutdown hook for graceful shutdown.
Shutdown.registerHook(vertx, closeables.toArray(new AutoCloseable[0]));

} catch (final Exception ex) {
logger.error("Failed to startup the receiver", ex);
Shutdown.closeVertxSync(vertx);
System.exit(1);
}
}
}
Loading

0 comments on commit 123e6c1

Please sign in to comment.