Skip to content

Commit

Permalink
fix: do not build OIDC config unless enabled (#4021) (#4055)
Browse files Browse the repository at this point in the history
* fix: do not build OIDC config unless enabled



* feat: receiver redeploys verticles when oidc feature changes



* feat: the control plane ensures receiver restarts

When config-features changes, the control plane
sets a annotation on the receiver pods so that
the configmap update is reconciled by k8s



* mvn spotless:apply



* cleanup: goimports



* fix: do not re-deploy verticles



* fix: features config paths are now correct



* fix java unit tests



* mvn spotless:apply



* address review comments



---------

Signed-off-by: Calum Murray <[email protected]>
  • Loading branch information
Cali0707 authored Aug 15, 2024
1 parent 9730d52 commit d641892
Show file tree
Hide file tree
Showing 19 changed files with 208 additions and 79 deletions.
35 changes: 23 additions & 12 deletions control-plane/pkg/reconciler/base/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package base
import (
"context"
"fmt"
"strconv"
"time"

"go.uber.org/zap"
"google.golang.org/protobuf/encoding/protojson"
Expand Down Expand Up @@ -67,6 +67,9 @@ const (
// volume generation annotation data plane pods.
VolumeGenerationAnnotationKey = "volumeGeneration"

// config features update time annotation for data plane pods.
ConfigFeaturesUpdatedAnnotationKey = "configFeaturesUpdatedAt"

Protobuf = "protobuf"
Json = "json"
)
Expand Down Expand Up @@ -281,23 +284,31 @@ func (r *Reconciler) UpdateDataPlaneConfigMap(ctx context.Context, contract *con
return nil
}

func (r *Reconciler) UpdateDispatcherPodsAnnotation(ctx context.Context, logger *zap.Logger, volumeGeneration uint64) error {
func (r *Reconciler) UpdateDispatcherPodsContractGenerationAnnotation(ctx context.Context, logger *zap.Logger, volumeGeneration uint64) error {
pods, errors := r.PodLister.Pods(r.DataPlaneNamespace).List(r.dispatcherSelector())
if errors != nil {
return fmt.Errorf("failed to list dispatcher pods in namespace %s: %w", r.DataPlaneNamespace, errors)
}
return r.UpdatePodsAnnotation(ctx, logger, "dispatcher", volumeGeneration, pods)
return r.UpdatePodsAnnotation(ctx, logger, "dispatcher", VolumeGenerationAnnotationKey, fmt.Sprint(volumeGeneration), pods)
}

func (r *Reconciler) UpdateReceiverPodsAnnotation(ctx context.Context, logger *zap.Logger, volumeGeneration uint64) error {
func (r *Reconciler) UpdateReceiverPodsContractGenerationAnnotation(ctx context.Context, logger *zap.Logger, volumeGeneration uint64) error {
pods, errors := r.PodLister.Pods(r.DataPlaneNamespace).List(r.ReceiverSelector())
if errors != nil {
return fmt.Errorf("failed to list receiver pods in namespace %s: %w", r.DataPlaneNamespace, errors)
}
return r.UpdatePodsAnnotation(ctx, logger, "receiver", volumeGeneration, pods)
return r.UpdatePodsAnnotation(ctx, logger, "receiver", VolumeGenerationAnnotationKey, fmt.Sprint(volumeGeneration), pods)
}

func (r *Reconciler) UpdateReceiverConfigFeaturesUpdatedAnnotation(ctx context.Context, logger *zap.Logger) error {
pods, err := r.PodLister.Pods(r.DataPlaneNamespace).List(r.ReceiverSelector())
if err != nil {
return fmt.Errorf("failed to list receiver pods in namespace %s: %s", r.DataPlaneNamespace, err)
}
return r.UpdatePodsAnnotation(ctx, logger, "receiver", ConfigFeaturesUpdatedAnnotationKey, time.Now().String(), pods)
}

func (r *Reconciler) UpdatePodsAnnotation(ctx context.Context, logger *zap.Logger, component string, volumeGeneration uint64, pods []*corev1.Pod) error {
func (r *Reconciler) UpdatePodsAnnotation(ctx context.Context, logger *zap.Logger, component, annotationKey, annotationValue string, pods []*corev1.Pod) error {

var errors error

Expand All @@ -306,7 +317,7 @@ func (r *Reconciler) UpdatePodsAnnotation(ctx context.Context, logger *zap.Logge
logger.Debug(
"Update "+component+" pod annotation",
zap.String("pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)),
zap.Uint64("volumeGeneration", volumeGeneration),
zap.String(annotationKey, annotationValue),
)

// do not update cache copy
Expand All @@ -318,15 +329,15 @@ func (r *Reconciler) UpdatePodsAnnotation(ctx context.Context, logger *zap.Logge
}

// Check whether pod's annotation is the expected one.
if v, ok := annotations[VolumeGenerationAnnotationKey]; ok {
v, err := strconv.ParseUint(v /* base */, 10 /* bitSize */, 64)
if err == nil && v == volumeGeneration {
// Volume generation already matches the expected volume generation number.
if v, ok := annotations[annotationKey]; ok {
if v == annotationValue {
logger.Debug(component + " pod annotation already up to date")
// annotation is already correct.
continue
}
}

annotations[VolumeGenerationAnnotationKey] = fmt.Sprint(volumeGeneration)
annotations[annotationKey] = annotationValue
pod.SetAnnotations(annotations)

if _, err := r.KubeClient.CoreV1().Pods(pod.Namespace).Update(ctx, pod, metav1.UpdateOptions{}); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/base/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestUpdateReceiverPodAnnotation(t *testing.T) {
ReceiverLabel: base.SinkReceiverLabel,
}

err := r.UpdateReceiverPodsAnnotation(ctx, logging.FromContext(ctx).Desugar(), 1)
err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logging.FromContext(ctx).Desugar(), 1)
require.Nil(t, err)
}

Expand All @@ -247,7 +247,7 @@ func TestUpdateDispatcherPodAnnotation(t *testing.T) {
DispatcherLabel: label,
}

err := r.UpdateDispatcherPodsAnnotation(ctx, logging.FromContext(ctx).Desugar(), 1)
err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logging.FromContext(ctx).Desugar(), 1)
require.Nil(t, err)
}

Expand Down
8 changes: 4 additions & 4 deletions control-plane/pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)
// the update even if here eventually means seconds or minutes after the actual update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
logger.Error("Failed to update receiver pod annotation", zap.Error(
statusConditionManager.FailedToUpdateReceiverPodsAnnotation(err),
))
Expand All @@ -217,7 +217,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)
logger.Debug("Updated receiver pod annotation")

// Update volume generation annotation of dispatcher pods
if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
// Failing to update dispatcher pods annotation leads to config map refresh delayed by several seconds.
// Since the dispatcher side is the consumer side, we don't lose availability, and we can consider the Broker
// ready. So, log out the error and move on to the next step.
Expand Down Expand Up @@ -496,11 +496,11 @@ func (r *Reconciler) deleteResourceFromContractConfigMap(ctx context.Context, lo
// Note: if there aren't changes to be done at the pod annotation level, we just skip the update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}
// Update volume generation annotation of dispatcher pods
if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}

Expand Down
4 changes: 4 additions & 0 deletions control-plane/pkg/reconciler/broker/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E
if globalResync != nil {
globalResync(obj)
}
err = reconciler.UpdateReceiverConfigFeaturesUpdatedAnnotation(ctx, logger.Desugar())
if err != nil {
logger.Warn("config-features updated, but the receiver pods were not successfully annotated. This may lead to features not working as expected.", zap.Error(err))
}
})
featureStore.WatchConfigs(watcher)

Expand Down
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta
// the update even if here eventually means seconds or minutes after the actual update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
logger.Error("Failed to update receiver pod annotation", zap.Error(
statusConditionManager.FailedToUpdateReceiverPodsAnnotation(err),
))
Expand Down Expand Up @@ -415,7 +415,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, channel *messagingv1beta1
// Note: if there aren't changes to be done at the pod annotation level, we just skip the update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}

Expand Down
4 changes: 4 additions & 0 deletions control-plane/pkg/reconciler/channel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
if globalResync != nil {
globalResync(nil)
}
err = reconciler.UpdateReceiverConfigFeaturesUpdatedAnnotation(ctx, logger.Desugar())
if err != nil {
logger.Warn("config-features updated, but the receiver pods were not successfully annotated. This may lead to features not working as expected.", zap.Error(err))
}
})
featureStore.WatchConfigs(watcher)

Expand Down
2 changes: 1 addition & 1 deletion control-plane/pkg/reconciler/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func (r *Reconciler) schedule(ctx context.Context, logger *zap.Logger, c *kafkai
return false, err
}

return true, b.UpdatePodsAnnotation(ctx, logger, "dispatcher" /* component, for logging */, ct.Generation, []*corev1.Pod{p})
return true, b.UpdatePodsAnnotation(ctx, logger, "dispatcher" /* component, for logging */, base.VolumeGenerationAnnotationKey, fmt.Sprint(ct.Generation), []*corev1.Pod{p})
}

func (r *Reconciler) commonReconciler(p *corev1.Pod, cmName string) base.Reconciler {
Expand Down
4 changes: 4 additions & 0 deletions control-plane/pkg/reconciler/sink/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
if globalResync != nil {
globalResync(obj)
}
err = reconciler.UpdateReceiverConfigFeaturesUpdatedAnnotation(ctx, logger.Desugar())
if err != nil {
logger.Warn("config-features updated, but the receiver pods were not successfully annotated. This may lead to features not working as expected.", zap.Error(err))
}
})
featureStore.WatchConfigs(watcher)

Expand Down
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink)
// receivers haven't got the Sink, so update failures to receiver pods is a hard failure.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}

Expand Down Expand Up @@ -341,7 +341,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, ks *eventing.KafkaSink) e
// Note: if there aren't changes to be done at the pod annotation level, we just skip the update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, trigger *eventing.Trigge
}

// Update volume generation annotation of dispatcher pods
if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
// Failing to update dispatcher pods annotation leads to config map refresh delayed by several seconds.
// Since the dispatcher side is the consumer side, we don't lose availability, and we can consider the Trigger
// ready. So, log out the error and move on to the next step.
Expand Down Expand Up @@ -289,7 +289,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, trigger *eventing.Trigger
logger.Debug("Updated data plane config map", zap.String("configmap", r.Env.DataPlaneConfigMapAsString()))

// Update volume generation annotation of dispatcher pods
if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
// Failing to update dispatcher pods annotation leads to config map refresh delayed by several seconds.
// The delete trigger will eventually be seen by the data plane pods, so log out the error and move on to the
// next step.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class FileWatcher implements AutoCloseable {
/**
* All args constructor.
*
* @param contractConsumer updates receiver.
* @param triggerFunction is triggered whenever there is a file change.
* @param file file to watch
*/
public FileWatcher(File file, Runnable triggerFunction) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright © 2018 Knative Authors ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.core.oidc;

import dev.knative.eventing.kafka.broker.core.features.FeaturesConfig;
import dev.knative.eventing.kafka.broker.core.file.FileWatcher;
import io.vertx.core.Vertx;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OIDCDiscoveryConfigListener implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(OIDCDiscoveryConfigListener.class);
private final String featuresConfigPath;
private final Vertx vertx;
private final FileWatcher configFeaturesWatcher;
private final int timeoutSeconds;
private List<Consumer<OIDCDiscoveryConfig>> callbacks;
private OIDCDiscoveryConfig oidcDiscoveryConfig;

public OIDCDiscoveryConfigListener(String featuresConfigPath, Vertx vertx, int timeoutSeconds) throws IOException {
this.featuresConfigPath = featuresConfigPath;
this.vertx = vertx;
this.timeoutSeconds = timeoutSeconds;

this.buildFeaturesAndOIDCDiscoveryConfig();

this.configFeaturesWatcher =
new FileWatcher(new File(featuresConfigPath + "/" + FeaturesConfig.KEY_AUTHENTICATION_OIDC), () -> {
if (this.oidcDiscoveryConfig == null) {
this.buildFeaturesAndOIDCDiscoveryConfig();
if (this.oidcDiscoveryConfig != null && this.callbacks != null) {
this.callbacks.stream()
.filter(Objects::nonNull)
.forEach(c -> c.accept(this.oidcDiscoveryConfig));
}
}
});

this.configFeaturesWatcher.start();
}

public OIDCDiscoveryConfig getOidcDiscoveryConfig() {
return oidcDiscoveryConfig;
}

public int registerCallback(Consumer<OIDCDiscoveryConfig> callback) {
if (this.callbacks == null) {
this.callbacks = new ArrayList<>();
}

this.callbacks.add(callback);
return this.callbacks.size() - 1;
}

public void deregisterCallback(int callbackId) {
this.callbacks.set(callbackId, null);
}

private void buildOIDCDiscoveryConfig() throws ExecutionException, InterruptedException, TimeoutException {
this.oidcDiscoveryConfig = OIDCDiscoveryConfig.build(this.vertx)
.toCompletionStage()
.toCompletableFuture()
.get(this.timeoutSeconds, TimeUnit.SECONDS);
}

private void buildFeaturesAndOIDCDiscoveryConfig() {
try {
FeaturesConfig featuresConfig = new FeaturesConfig(featuresConfigPath);
if (featuresConfig.isAuthenticationOIDC()) {
try {
this.buildOIDCDiscoveryConfig();
} catch (ExecutionException | InterruptedException | TimeoutException e) {
logger.error("Unable to build OIDC Discover Config even though OIDC authentication is enabled", e);
}
}
} catch (IOException e) {
logger.warn("failed to get feature config, skipping building OIDC Discovery Config", e);
}
}

@Override
public void close() throws Exception {
this.configFeaturesWatcher.close();
}
}
Loading

0 comments on commit d641892

Please sign in to comment.