Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Customizable prefix for cloudevent extensions #4144

Draft
wants to merge 1 commit into
base: release-1.14
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
public class CloudEventOverridesMutator implements CloudEventMutator {

private final DataPlaneContract.CloudEventOverrides cloudEventOverrides;
private final String ceMetadataExtensionPrefix;

public CloudEventOverridesMutator(final DataPlaneContract.CloudEventOverrides cloudEventOverrides) {
public CloudEventOverridesMutator(final DataPlaneContract.CloudEventOverrides cloudEventOverrides, final String ceMetadataExtensionPrefix) {
this.cloudEventOverrides = cloudEventOverrides;
this.ceMetadataExtensionPrefix = ceMetadataExtensionPrefix;
}

@Override
Expand All @@ -49,7 +51,7 @@ private void applyCloudEventOverrides(CloudEventBuilder builder) {
}

private void applyKafkaMetadata(CloudEventBuilder builder, Number partition, Number offset) {
builder.withExtension("knativekafkapartition", partition);
builder.withExtension("knativekafkaoffset", offset);
builder.withExtension(ceMetadataExtensionPrefix + "partition", partition);
builder.withExtension(ceMetadataExtensionPrefix + "offset", offset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ private void build(final Vertx vertx, final ConsumerVerticle consumerVerticle, f
TracingPolicy.PROPAGATE),
Metrics.getRegistry()),
new CloudEventOverridesMutator(
consumerVerticleContext.getResource().getCloudEventOverrides()));
consumerVerticleContext.getResource().getCloudEventOverrides(),
consumerVerticleContext.getCeMetadataExtensionPrefix()
));
consumerVerticle.setRecordDispatcher(recordDispatcher);

final var partitionRevokedHandlers =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class ConsumerVerticleContext {
private ReactiveConsumerFactory<Object, CloudEvent> consumerFactory;
private ReactiveProducerFactory<String, CloudEvent> producerFactory;

private String ceMetadataExtensionPrefix;

private Integer maxPollRecords;
private static final int DEFAULT_MAX_POLL_RECORDS = 50;

Expand Down Expand Up @@ -163,6 +165,12 @@ public ConsumerVerticleContext withProducerFactory(
return this;
}

public ConsumerVerticleContext withCeMetadataExtensionPrefix(
final String ceMetadataExtensionPrefix) {
this.ceMetadataExtensionPrefix = ceMetadataExtensionPrefix;
return this;
}

public DataPlaneContract.Resource getResource() {
return resource;
}
Expand Down Expand Up @@ -230,6 +238,10 @@ public ReactiveProducerFactory<String, CloudEvent> getProducerFactory() {
return this.producerFactory;
}

public String getCeMetadataExtensionPrefix() {
return this.ceMetadataExtensionPrefix;
}

public Tags getTags() {
return tags;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class ConsumerVerticleFactoryImpl implements ConsumerVerticleFactory {
private final MeterRegistry metricsRegistry;
private final ReactiveConsumerFactory reactiveConsumerFactory;
private final ReactiveProducerFactory reactiveProducerFactory;
private final String ceMetadataExtensionPrefix;

/**
* All args constructor.
Expand All @@ -56,7 +57,8 @@ public ConsumerVerticleFactoryImpl(
final AuthProvider authProvider,
final MeterRegistry metricsRegistry,
final ReactiveConsumerFactory reactiveConsumerFactory,
final ReactiveProducerFactory reactiveProducerFactory) {
final ReactiveProducerFactory reactiveProducerFactory,
final String ceMetadataExtensionPrefix) {

Objects.requireNonNull(consumerConfigs, "provide consumerConfigs");
Objects.requireNonNull(webClientOptions, "provide webClientOptions");
Expand All @@ -75,6 +77,7 @@ public ConsumerVerticleFactoryImpl(
this.metricsRegistry = metricsRegistry;
this.reactiveConsumerFactory = reactiveConsumerFactory;
this.reactiveProducerFactory = reactiveProducerFactory;
this.ceMetadataExtensionPrefix = ceMetadataExtensionPrefix;
}

/**
Expand All @@ -93,7 +96,8 @@ public ConsumerVerticle get(final EgressContext egressContext) {
.withMeterRegistry(metricsRegistry)
.withResource(egressContext)
.withConsumerFactory(reactiveConsumerFactory)
.withProducerFactory(reactiveProducerFactory))
.withProducerFactory(reactiveProducerFactory)
.withCeMetadataExtensionPrefix(ceMetadataExtensionPrefix))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package dev.knative.eventing.kafka.broker.dispatcher.main;

import static java.util.Objects.requireNonNull;
import static java.util.Objects.requireNonNullElse;

import dev.knative.eventing.kafka.broker.core.utils.BaseEnv;
import java.util.function.Function;
Expand All @@ -31,12 +32,16 @@ public class DispatcherEnv extends BaseEnv {
public static final String EGRESSES_INITIAL_CAPACITY = "EGRESSES_INITIAL_CAPACITY";
private final int egressesInitialCapacity;

public static final String CE_METADATA_EXTENSION_PREFIX = "CE_METADATA_EXTENSION_PREFIX";
private final String ceMetadataExtensionPrefix;

public DispatcherEnv(Function<String, String> envProvider) {
super(envProvider);

this.consumerConfigFilePath = requireNonNull(envProvider.apply(CONSUMER_CONFIG_FILE_PATH));
this.webClientConfigFilePath = requireNonNull(envProvider.apply(WEBCLIENT_CONFIG_FILE_PATH));
this.egressesInitialCapacity = Integer.parseInt(requireNonNull(envProvider.apply(EGRESSES_INITIAL_CAPACITY)));
this.ceMetadataExtensionPrefix = requireNonNullElse(envProvider.apply(CE_METADATA_EXTENSION_PREFIX), "knativekafka");
}

public String getConsumerConfigFilePath() {
Expand All @@ -51,6 +56,10 @@ public int getEgressesInitialCapacity() {
return egressesInitialCapacity;
}

public String getCeMetadataExtensionPrefix() {
return ceMetadataExtensionPrefix;
}

@Override
public String toString() {
return "DispatcherEnv{" + "consumerConfigFilePath='"
Expand Down