diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java index ccfb0edbf4..8146121664 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java @@ -28,9 +28,11 @@ public class CloudEventOverridesMutator implements CloudEventMutator { private final DataPlaneContract.CloudEventOverrides cloudEventOverrides; + private final String ceMetadataExtensionPrefix; - public CloudEventOverridesMutator(final DataPlaneContract.CloudEventOverrides cloudEventOverrides) { + public CloudEventOverridesMutator(final DataPlaneContract.CloudEventOverrides cloudEventOverrides, final String ceMetadataExtensionPrefix) { this.cloudEventOverrides = cloudEventOverrides; + this.ceMetadataExtensionPrefix = ceMetadataExtensionPrefix; } @Override @@ -49,7 +51,7 @@ private void applyCloudEventOverrides(CloudEventBuilder builder) { } private void applyKafkaMetadata(CloudEventBuilder builder, Number partition, Number offset) { - builder.withExtension("knativekafkapartition", partition); - builder.withExtension("knativekafkaoffset", offset); + builder.withExtension(ceMetadataExtensionPrefix + "partition", partition); + builder.withExtension(ceMetadataExtensionPrefix + "offset", offset); } } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java index 230993df7d..561f9e8521 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java @@ -122,7 +122,9 @@ private void build(final Vertx vertx, final ConsumerVerticle consumerVerticle, f TracingPolicy.PROPAGATE), Metrics.getRegistry()), new CloudEventOverridesMutator( - consumerVerticleContext.getResource().getCloudEventOverrides())); + consumerVerticleContext.getResource().getCloudEventOverrides(), + consumerVerticleContext.getCeMetadataExtensionPrefix() + )); consumerVerticle.setRecordDispatcher(recordDispatcher); final var partitionRevokedHandlers = diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleContext.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleContext.java index 1803a84e21..da95f381a1 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleContext.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleContext.java @@ -61,6 +61,8 @@ public class ConsumerVerticleContext { private ReactiveConsumerFactory consumerFactory; private ReactiveProducerFactory producerFactory; + private String ceMetadataExtensionPrefix; + private Integer maxPollRecords; private static final int DEFAULT_MAX_POLL_RECORDS = 50; @@ -163,6 +165,12 @@ public ConsumerVerticleContext withProducerFactory( return this; } + public ConsumerVerticleContext withCeMetadataExtensionPrefix( + final String ceMetadataExtensionPrefix) { + this.ceMetadataExtensionPrefix = ceMetadataExtensionPrefix; + return this; + } + public DataPlaneContract.Resource getResource() { return resource; } @@ -230,6 +238,10 @@ public ReactiveProducerFactory getProducerFactory() { return this.producerFactory; } + public String getCeMetadataExtensionPrefix() { + return this.ceMetadataExtensionPrefix; + } + public Tags getTags() { return tags; } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleFactoryImpl.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleFactoryImpl.java index 2b4289d264..959ee7cdf8 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleFactoryImpl.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleFactoryImpl.java @@ -39,6 +39,7 @@ public class ConsumerVerticleFactoryImpl implements ConsumerVerticleFactory { private final MeterRegistry metricsRegistry; private final ReactiveConsumerFactory reactiveConsumerFactory; private final ReactiveProducerFactory reactiveProducerFactory; + private final String ceMetadataExtensionPrefix; /** * All args constructor. @@ -56,7 +57,8 @@ public ConsumerVerticleFactoryImpl( final AuthProvider authProvider, final MeterRegistry metricsRegistry, final ReactiveConsumerFactory reactiveConsumerFactory, - final ReactiveProducerFactory reactiveProducerFactory) { + final ReactiveProducerFactory reactiveProducerFactory, + final String ceMetadataExtensionPrefix) { Objects.requireNonNull(consumerConfigs, "provide consumerConfigs"); Objects.requireNonNull(webClientOptions, "provide webClientOptions"); @@ -75,6 +77,7 @@ public ConsumerVerticleFactoryImpl( this.metricsRegistry = metricsRegistry; this.reactiveConsumerFactory = reactiveConsumerFactory; this.reactiveProducerFactory = reactiveProducerFactory; + this.ceMetadataExtensionPrefix = ceMetadataExtensionPrefix; } /** @@ -93,7 +96,8 @@ public ConsumerVerticle get(final EgressContext egressContext) { .withMeterRegistry(metricsRegistry) .withResource(egressContext) .withConsumerFactory(reactiveConsumerFactory) - .withProducerFactory(reactiveProducerFactory)) + .withProducerFactory(reactiveProducerFactory) + .withCeMetadataExtensionPrefix(ceMetadataExtensionPrefix)) .build(); } } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/DispatcherEnv.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/DispatcherEnv.java index 6436d4253b..b3e7a007a2 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/DispatcherEnv.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/DispatcherEnv.java @@ -16,6 +16,7 @@ package dev.knative.eventing.kafka.broker.dispatcher.main; import static java.util.Objects.requireNonNull; +import static java.util.Objects.requireNonNullElse; import dev.knative.eventing.kafka.broker.core.utils.BaseEnv; import java.util.function.Function; @@ -31,12 +32,16 @@ public class DispatcherEnv extends BaseEnv { public static final String EGRESSES_INITIAL_CAPACITY = "EGRESSES_INITIAL_CAPACITY"; private final int egressesInitialCapacity; + public static final String CE_METADATA_EXTENSION_PREFIX = "CE_METADATA_EXTENSION_PREFIX"; + private final String ceMetadataExtensionPrefix; + public DispatcherEnv(Function envProvider) { super(envProvider); this.consumerConfigFilePath = requireNonNull(envProvider.apply(CONSUMER_CONFIG_FILE_PATH)); this.webClientConfigFilePath = requireNonNull(envProvider.apply(WEBCLIENT_CONFIG_FILE_PATH)); this.egressesInitialCapacity = Integer.parseInt(requireNonNull(envProvider.apply(EGRESSES_INITIAL_CAPACITY))); + this.ceMetadataExtensionPrefix = requireNonNullElse(envProvider.apply(CE_METADATA_EXTENSION_PREFIX), "knativekafka"); } public String getConsumerConfigFilePath() { @@ -51,6 +56,10 @@ public int getEgressesInitialCapacity() { return egressesInitialCapacity; } + public String getCeMetadataExtensionPrefix() { + return ceMetadataExtensionPrefix; + } + @Override public String toString() { return "DispatcherEnv{" + "consumerConfigFilePath='"