Skip to content

Commit

Permalink
Prep work for registering schemas
Browse files Browse the repository at this point in the history
Wire in the ability for a serde provider to 'ensure' the schemas of owned topics are registered. This is called during service initialisation.

This will be used when JSON schema serde is added. (#25).
  • Loading branch information
big-andy-coates committed Nov 22, 2023
1 parent 841360a commit bb0c3a5
Show file tree
Hide file tree
Showing 15 changed files with 326 additions and 66 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ jobs:
run: |
./gradlew -Dgradle.publish.key="$GRADLE_PUBLISH_KEY" -Dgradle.publish.secret="$GRADLE_PUBLISH_SECRET" publishPlugins
# Until Creek fully supports Windows, minimal check:
build_windows:
runs-on: windows-latest
steps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.creekservice.api.base.annotation.VisibleForTesting;
import org.creekservice.api.kafka.extension.client.TopicClient;
import org.creekservice.api.kafka.extension.config.ClustersProperties;
import org.creekservice.api.kafka.serde.provider.KafkaSerdeProviders;
import org.creekservice.api.platform.metadata.ComponentDescriptor;
import org.creekservice.api.service.extension.CreekExtensionProvider;
import org.creekservice.api.service.extension.CreekService;
Expand All @@ -47,12 +48,16 @@ public final class KafkaClientsExtensionProvider

/** Constructor */
public KafkaClientsExtensionProvider() {
this(KafkaSerdeProviders.create());
}

private KafkaClientsExtensionProvider(final KafkaSerdeProviders serdeProviders) {
this(
new KafkaResourceValidator(),
new ClustersPropertiesFactory(),
KafkaTopicClient::new,
props -> new KafkaTopicClient(props, serdeProviders),
ClientsExtension::new,
new ResourceRegistryFactory());
new ResourceRegistryFactory(serdeProviders));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static java.util.stream.Collectors.toList;
import static org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG;

import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -36,34 +37,51 @@
import org.creekservice.api.base.annotation.VisibleForTesting;
import org.creekservice.api.kafka.extension.client.TopicClient;
import org.creekservice.api.kafka.extension.config.ClustersProperties;
import org.creekservice.api.kafka.extension.logging.LoggingField;
import org.creekservice.api.kafka.metadata.CreatableKafkaTopic;
import org.creekservice.api.kafka.metadata.KafkaTopicDescriptor;
import org.creekservice.api.kafka.metadata.KafkaTopicDescriptor.PartDescriptor;
import org.creekservice.api.kafka.serde.provider.KafkaSerdeProvider;
import org.creekservice.api.kafka.serde.provider.KafkaSerdeProvider.TopicPart;
import org.creekservice.api.kafka.serde.provider.KafkaSerdeProviders;
import org.creekservice.api.observability.logging.structured.LogEntryCustomizer;
import org.creekservice.api.observability.logging.structured.StructuredLogger;
import org.creekservice.api.observability.logging.structured.StructuredLoggerFactory;

/** Implementation of {@link TopicClient} */
/**
* Implementation of {@link TopicClient}.
*
* <p>Responsible for ensuring topics are created, along with any associated resources, e.g.
* schemas.
*/
public final class KafkaTopicClient implements TopicClient {

private final StructuredLogger logger;
private final ClustersProperties clusterProps;
private final KafkaSerdeProviders serdeProviders;
private final Function<Map<String, Object>, Admin> adminFactory;

/**
* @param clusterProps props
* @param serdeProviders all know serde providers
*/
public KafkaTopicClient(final ClustersProperties clusterProps) {
public KafkaTopicClient(
final ClustersProperties clusterProps, final KafkaSerdeProviders serdeProviders) {
this(
clusterProps,
serdeProviders,
Admin::create,
StructuredLoggerFactory.internalLogger(KafkaTopicClient.class));
}

@VisibleForTesting
KafkaTopicClient(
final ClustersProperties clusterProps,
final KafkaSerdeProviders serdeProviders,
final Function<Map<String, Object>, Admin> adminFactory,
final StructuredLogger logger) {
this.clusterProps = requireNonNull(clusterProps, "clusterProps");
this.serdeProviders = requireNonNull(serdeProviders, "serdeProviders");
this.adminFactory = requireNonNull(adminFactory, "adminFactory");
this.logger = requireNonNull(logger, "logger");
}
Expand All @@ -76,11 +94,38 @@ public void ensure(final List<? extends CreatableKafkaTopic<?, ?>> topics) {
}

private void ensure(final String cluster, final List<CreatableKafkaTopic<?, ?>> topics) {
logger.info(
ensureTopicResources(topics);
ensureTopics(cluster, topics);
}

private void ensureTopicResources(final List<CreatableKafkaTopic<?, ?>> topics) {
topics.forEach(
topic -> {
logger.debug(
"Ensuring topic resources",
log -> log.with(LoggingField.topicId, topic.id()));

ensureTopicPartResources(topic, TopicPart.key);
ensureTopicPartResources(topic, TopicPart.value);
});
}

private void ensureTopicPartResources(
final CreatableKafkaTopic<?, ?> topic, final TopicPart topicPart) {
final PartDescriptor<?> part =
topicPart.equals(TopicPart.key) ? topic.key() : topic.value();
final KafkaSerdeProvider serdeProvider = serdeProviders.get(part.format());

serdeProvider.ensureTopicPartResources(
part, topicPart, topic, clusterProps.get(topic.cluster()));
}

private void ensureTopics(final String cluster, final List<CreatableKafkaTopic<?, ?>> topics) {
logger.debug(
"Ensuring topics",
log ->
log.with(
"topic-ids",
LoggingField.topicIds,
topics.stream().map(CreatableKafkaTopic::id).collect(toList())));

try (Admin admin = adminFactory.apply(clusterProps.get(cluster))) {
Expand All @@ -104,6 +149,7 @@ private void create(
final Consumer<Map.Entry<String, KafkaFuture<Void>>> throwOnFailure =
e -> {
final String topic = e.getKey();
final URI topicId = KafkaTopicDescriptor.resourceId(cluster, topic);
try {
e.getValue().get();

Expand All @@ -117,20 +163,21 @@ private void create(
"Created topic",
log -> {
final LogEntryCustomizer configNs =
log.with("cluster", cluster)
.with("name", topic)
.with("partitions", partitions)
log.with(LoggingField.topicId, topicId)
.with(LoggingField.partitions, partitions)
.ns("config");

config.forEach(c -> configNs.with(c.name(), c.value()));
});
} catch (ExecutionException ex) {
if (!(ex.getCause() instanceof TopicExistsException)) {
throw new CreateTopicException(topic, cluster, ex.getCause());
throw new CreateTopicException(topicId, ex.getCause());
}
logger.debug("Topic already exists", log -> log.with("nane", topic));
logger.debug(
"Topic already exists",
log -> log.with(LoggingField.topicId, topicId));
} catch (Exception ex) {
throw new CreateTopicException(topic, cluster, ex);
throw new CreateTopicException(topicId, ex);
}
};

Expand All @@ -146,11 +193,8 @@ private static NewTopic toNewTopic(final CreatableKafkaTopic<?, ?> descriptor) {
}

private static final class CreateTopicException extends RuntimeException {
CreateTopicException(
final String topicName, final String clusterName, final Throwable cause) {
super(
"Failed to create topic. topic: " + topicName + ", cluster: " + clusterName,
cause);
CreateTopicException(final URI topicId, final Throwable cause) {
super("Failed to create topic. " + LoggingField.topicId + ": " + topicId, cause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import org.apache.kafka.common.serialization.Serde;
import org.creekservice.api.base.annotation.VisibleForTesting;
import org.creekservice.api.kafka.extension.config.ClustersProperties;
import org.creekservice.api.kafka.extension.logging.LoggingField;
import org.creekservice.api.kafka.metadata.CreatableKafkaTopic;
import org.creekservice.api.kafka.metadata.KafkaTopicDescriptor;
import org.creekservice.api.kafka.metadata.SerializationFormat;
import org.creekservice.api.kafka.serde.provider.KafkaSerdeProvider;
import org.creekservice.api.kafka.serde.provider.KafkaSerdeProvider.TopicPart;
import org.creekservice.api.kafka.serde.provider.KafkaSerdeProviders;
import org.creekservice.api.platform.metadata.ComponentDescriptor;
import org.creekservice.internal.kafka.extension.resource.TopicCollector.CollectedTopics;
Expand All @@ -42,9 +44,13 @@ public final class ResourceRegistryFactory {
private final RegistryFactory registryFactory;
private final TopicFactory topicFactory;

/** Constructor. */
public ResourceRegistryFactory() {
this(KafkaSerdeProviders.create(), new TopicCollector(), ResourceRegistry::new, Topic::new);
/**
* Constructor.
*
* @param serdeProviders all known serde providers.
*/
public ResourceRegistryFactory(final KafkaSerdeProviders serdeProviders) {
this(serdeProviders, new TopicCollector(), ResourceRegistry::new, Topic::new);
}

@VisibleForTesting
Expand Down Expand Up @@ -97,31 +103,31 @@ public ResourceRegistry create(
private <K, V> Topic<K, V> createTopicResource(
final KafkaTopicDescriptor<K, V> def, final ClustersProperties allProperties) {
final Map<String, Object> properties = allProperties.get(def.cluster());
final Serde<K> keySerde = serde(def.key(), def.name(), true, properties);
final Serde<V> valueSerde = serde(def.value(), def.name(), false, properties);
final Serde<K> keySerde = serde(def.key(), def.id(), TopicPart.key, properties);
final Serde<V> valueSerde = serde(def.value(), def.id(), TopicPart.value, properties);
return topicFactory.create(def, keySerde, valueSerde);
}

private <T> Serde<T> serde(
final KafkaTopicDescriptor.PartDescriptor<T> part,
final String topicName,
final boolean isKey,
final URI topicId,
final TopicPart topicPart,
final Map<String, Object> clusterProperties) {
final KafkaSerdeProvider provider = provider(part, topicName, isKey);
final KafkaSerdeProvider provider = provider(part, topicId, topicPart);

final Serde<T> serde = provider.create(part);
serde.configure(clusterProperties, isKey);
serde.configure(clusterProperties, topicPart.equals(TopicPart.key));
return serde;
}

private <T> KafkaSerdeProvider provider(
final KafkaTopicDescriptor.PartDescriptor<T> part,
final String topicName,
final boolean isKey) {
final URI topicId,
final TopicPart topicPart) {
try {
return serdeProviders.get(part.format());
} catch (final Exception e) {
throw new UnknownSerializationFormatException(part.format(), topicName, isKey, e);
throw new UnknownSerializationFormatException(part.format(), topicId, topicPart, e);
}
}

Expand All @@ -139,17 +145,19 @@ interface RegistryFactory {
private static final class UnknownSerializationFormatException extends RuntimeException {
UnknownSerializationFormatException(
final SerializationFormat format,
final String topicName,
final boolean isKey,
final URI topicId,
final TopicPart topicPart,
final Throwable cause) {
super(
"Unknown "
+ (isKey ? "key" : "value")
+ topicPart
+ " serialization format encountered."
+ " format="
+ format
+ ", topic="
+ topicName,
+ ", "
+ LoggingField.topicId
+ "="
+ topicId,
cause);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.mockito.quality.Strictness;
import org.mockito.stubbing.Answer;

@SuppressWarnings("resource")
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
class KafkaClientsExtensionProviderTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.creekservice.api.kafka.extension.config.ClustersProperties;
import org.creekservice.api.kafka.extension.resource.KafkaTopic;
import org.creekservice.api.kafka.metadata.CreatableKafkaTopic;
import org.creekservice.api.kafka.serde.provider.KafkaSerdeProviders;
import org.creekservice.api.kafka.test.service.TestServiceDescriptor;
import org.creekservice.api.kafka.test.service.UpstreamAggregateDescriptor;
import org.creekservice.internal.kafka.extension.resource.ResourceRegistry;
Expand Down Expand Up @@ -83,7 +84,7 @@ void setUp() {
.build(Set.of());

final ResourceRegistry registry =
new ResourceRegistryFactory()
new ResourceRegistryFactory(KafkaSerdeProviders.create())
.create(List.of(new TestServiceDescriptor()), clustersProperties);

try (Admin admin = Admin.create(clustersProperties.get(DEFAULT_CLUSTER_NAME))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.creekservice.api.kafka.extension.config.ClustersProperties;
import org.creekservice.api.kafka.metadata.CreatableKafkaTopic;
import org.creekservice.api.kafka.metadata.OwnedKafkaTopicOutput;
import org.creekservice.api.kafka.serde.provider.KafkaSerdeProviders;
import org.creekservice.test.TopicConfigBuilder;
import org.creekservice.test.TopicDescriptors;
import org.hamcrest.Description;
Expand Down Expand Up @@ -97,7 +98,7 @@ class KafkaTopicClientFunctionalTest {

@BeforeEach
void setUp() {
client = new KafkaTopicClient(clustersProperties);
client = new KafkaTopicClient(clustersProperties, KafkaSerdeProviders.create());

final Map<String, Object> defaultClusterProps =
Map.of(BOOTSTRAP_SERVERS_CONFIG, DEFAULT_CLUSTER.getBootstrapServers());
Expand Down
Loading

0 comments on commit bb0c3a5

Please sign in to comment.