From 7afe120fdc8a22460c32a196fdbc6bd4cc47cba4 Mon Sep 17 00:00:00 2001 From: Sarwar Bhuiyan Date: Mon, 4 Apr 2022 12:49:49 +0100 Subject: [PATCH 1/2] Abstracting Kafka Store from Store --- .gitignore | 1 + .../rest/SchemaRegistryRestApplication.java | 7 +- .../rest/resources/CompatibilityResource.java | 6 +- .../rest/resources/ConfigResource.java | 6 +- .../rest/resources/ModeResource.java | 7 +- .../rest/resources/SchemasResource.java | 6 +- .../resources/ServerMetadataResource.java | 9 +- .../resources/SubjectVersionsResource.java | 6 +- .../rest/resources/SubjectsResource.java | 6 +- .../storage/SchemaRegistry.java | 95 ++++++++++++++++++- findbugs/findbugs-exclude.xml | 10 ++ .../connect/json/JsonSchemaConverter.java | 9 +- 12 files changed, 138 insertions(+), 30 deletions(-) diff --git a/.gitignore b/.gitignore index b2243981722..b142f024e22 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,4 @@ target/ # Log file from schema registry daemon logs/ nohup.out +/.metadata/ diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java index ead64652cd0..d69f6700545 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java @@ -29,6 +29,7 @@ import io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource; import io.confluent.kafka.schemaregistry.rest.resources.SubjectsResource; import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry; +import io.confluent.kafka.schemaregistry.storage.SchemaRegistry; import io.confluent.kafka.schemaregistry.storage.serialization.SchemaRegistrySerializer; import io.confluent.rest.Application; import io.confluent.rest.RestConfigException; @@ -47,7 +48,7 @@ public class SchemaRegistryRestApplication extends Application { private static final Logger log = LoggerFactory.getLogger(SchemaRegistryRestApplication.class); - private KafkaSchemaRegistry schemaRegistry = null; + private SchemaRegistry schemaRegistry = null; private List schemaRegistryResourceExtensions = null; public SchemaRegistryRestApplication(Properties props) throws RestConfigException { @@ -65,7 +66,7 @@ public SchemaRegistryRestApplication(SchemaRegistryConfig config) { } - protected KafkaSchemaRegistry initSchemaRegistry(SchemaRegistryConfig config) { + protected SchemaRegistry initSchemaRegistry(SchemaRegistryConfig config) { KafkaSchemaRegistry kafkaSchemaRegistry = null; try { kafkaSchemaRegistry = new KafkaSchemaRegistry( @@ -158,6 +159,6 @@ public void onShutdown() { // for testing purpose only public KafkaSchemaRegistry schemaRegistry() { - return schemaRegistry; + return (KafkaSchemaRegistry)schemaRegistry; } } diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/CompatibilityResource.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/CompatibilityResource.java index 5c834f1c0e5..a13de69393b 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/CompatibilityResource.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/CompatibilityResource.java @@ -26,7 +26,7 @@ import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryStoreException; import io.confluent.kafka.schemaregistry.rest.VersionId; import io.confluent.kafka.schemaregistry.rest.exceptions.Errors; -import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry; +import io.confluent.kafka.schemaregistry.storage.SchemaRegistry; import io.confluent.kafka.schemaregistry.utils.QualifiedSubject; import io.confluent.rest.annotations.PerformanceMetric; import io.swagger.v3.oas.annotations.Operation; @@ -59,9 +59,9 @@ public class CompatibilityResource { private static final Logger log = LoggerFactory.getLogger(CompatibilityResource.class); - private final KafkaSchemaRegistry schemaRegistry; + private final SchemaRegistry schemaRegistry; - public CompatibilityResource(KafkaSchemaRegistry schemaRegistry) { + public CompatibilityResource(SchemaRegistry schemaRegistry) { this.schemaRegistry = schemaRegistry; } diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ConfigResource.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ConfigResource.java index ed86f475acf..0071628345b 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ConfigResource.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ConfigResource.java @@ -28,7 +28,7 @@ import io.confluent.kafka.schemaregistry.exceptions.UnknownLeaderException; import io.confluent.kafka.schemaregistry.rest.exceptions.Errors; import io.confluent.kafka.schemaregistry.rest.exceptions.RestInvalidCompatibilityException; -import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry; +import io.confluent.kafka.schemaregistry.storage.SchemaRegistry; import io.confluent.kafka.schemaregistry.utils.QualifiedSubject; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; @@ -63,11 +63,11 @@ public class ConfigResource { private static final Logger log = LoggerFactory.getLogger(ConfigResource.class); - private final KafkaSchemaRegistry schemaRegistry; + private final SchemaRegistry schemaRegistry; private final RequestHeaderBuilder requestHeaderBuilder = new RequestHeaderBuilder(); - public ConfigResource(KafkaSchemaRegistry schemaRegistry) { + public ConfigResource(SchemaRegistry schemaRegistry) { this.schemaRegistry = schemaRegistry; } diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ModeResource.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ModeResource.java index 69071f4422a..26911803234 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ModeResource.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ModeResource.java @@ -15,6 +15,7 @@ package io.confluent.kafka.schemaregistry.rest.resources; +<<<<<<< HEAD import static io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.GLOBAL_RESOURCE_NAME; import com.google.common.base.CharMatcher; @@ -28,7 +29,7 @@ import io.confluent.kafka.schemaregistry.exceptions.UnknownLeaderException; import io.confluent.kafka.schemaregistry.rest.exceptions.Errors; import io.confluent.kafka.schemaregistry.rest.exceptions.RestInvalidModeException; -import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry; +import io.confluent.kafka.schemaregistry.storage.SchemaRegistry; import io.confluent.kafka.schemaregistry.utils.QualifiedSubject; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; @@ -64,11 +65,11 @@ public class ModeResource { private static final Logger log = LoggerFactory.getLogger(ModeResource.class); - private final KafkaSchemaRegistry schemaRegistry; + private final SchemaRegistry schemaRegistry; private final RequestHeaderBuilder requestHeaderBuilder = new RequestHeaderBuilder(); - public ModeResource(KafkaSchemaRegistry schemaRegistry) { + public ModeResource(SchemaRegistry schemaRegistry) { this.schemaRegistry = schemaRegistry; } diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SchemasResource.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SchemasResource.java index adbee7b3bb2..9e37544240f 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SchemasResource.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SchemasResource.java @@ -22,7 +22,7 @@ import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryException; import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryStoreException; import io.confluent.kafka.schemaregistry.rest.exceptions.Errors; -import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry; +import io.confluent.kafka.schemaregistry.storage.SchemaRegistry; import io.confluent.rest.annotations.PerformanceMetric; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; @@ -54,9 +54,9 @@ public class SchemasResource { private static final Logger log = LoggerFactory.getLogger(SchemasResource.class); - private final KafkaSchemaRegistry schemaRegistry; + private final SchemaRegistry schemaRegistry; - public SchemasResource(KafkaSchemaRegistry schemaRegistry) { + public SchemasResource(SchemaRegistry schemaRegistry) { this.schemaRegistry = schemaRegistry; } diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ServerMetadataResource.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ServerMetadataResource.java index 7434ec26056..d06cd9296a6 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ServerMetadataResource.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ServerMetadataResource.java @@ -17,7 +17,7 @@ import io.confluent.kafka.schemaregistry.client.rest.Versions; import io.confluent.kafka.schemaregistry.client.rest.entities.ServerClusterId; -import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry; +import io.confluent.kafka.schemaregistry.storage.SchemaRegistry; import io.confluent.rest.annotations.PerformanceMetric; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.responses.ApiResponse; @@ -39,10 +39,11 @@ public class ServerMetadataResource { private static final Logger log = LoggerFactory.getLogger(ServerMetadataResource.class); - private final KafkaSchemaRegistry schemaRegistry; - public ServerMetadataResource(KafkaSchemaRegistry schemaRegistry) { - this.schemaRegistry = schemaRegistry; + private final SchemaRegistry schemaRegistry; + + public ServerMetadataResource(SchemaRegistry schemaRegistry) { + this.schemaRegistry = schemaRegistry; } @GET diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectVersionsResource.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectVersionsResource.java index 2235e8958de..10b6bee0a22 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectVersionsResource.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectVersionsResource.java @@ -39,7 +39,7 @@ import io.confluent.kafka.schemaregistry.exceptions.UnknownLeaderException; import io.confluent.kafka.schemaregistry.rest.VersionId; import io.confluent.kafka.schemaregistry.rest.exceptions.Errors; -import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry; +import io.confluent.kafka.schemaregistry.storage.SchemaRegistry; import io.confluent.kafka.schemaregistry.utils.QualifiedSubject; import io.confluent.rest.annotations.PerformanceMetric; import io.swagger.v3.oas.annotations.Operation; @@ -79,7 +79,7 @@ public class SubjectVersionsResource { private static final Logger log = LoggerFactory.getLogger(SubjectVersionsResource.class); - private final KafkaSchemaRegistry schemaRegistry; + private final SchemaRegistry schemaRegistry; private final RequestHeaderBuilder requestHeaderBuilder = new RequestHeaderBuilder(); @@ -88,7 +88,7 @@ public class SubjectVersionsResource { + "returns the last registered schema under the specified subject. Note that there may be a " + "new latest schema that gets registered right after this request is served."; - public SubjectVersionsResource(KafkaSchemaRegistry registry) { + public SubjectVersionsResource(SchemaRegistry registry) { this.schemaRegistry = registry; } diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectsResource.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectsResource.java index a3eb5120cf1..4b1dcbff885 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectsResource.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectsResource.java @@ -27,7 +27,7 @@ import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryTimeoutException; import io.confluent.kafka.schemaregistry.exceptions.SubjectNotSoftDeletedException; import io.confluent.kafka.schemaregistry.rest.exceptions.Errors; -import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry; +import io.confluent.kafka.schemaregistry.storage.SchemaRegistry; import io.confluent.kafka.schemaregistry.utils.QualifiedSubject; import io.confluent.rest.annotations.PerformanceMetric; import io.swagger.v3.oas.annotations.Operation; @@ -67,10 +67,10 @@ public class SubjectsResource { private static final Logger log = LoggerFactory.getLogger(SubjectsResource.class); - private final KafkaSchemaRegistry schemaRegistry; + private final SchemaRegistry schemaRegistry; private final RequestHeaderBuilder requestHeaderBuilder = new RequestHeaderBuilder(); - public SubjectsResource(KafkaSchemaRegistry schemaRegistry) { + public SubjectsResource(SchemaRegistry schemaRegistry) { this.schemaRegistry = schemaRegistry; } diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistry.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistry.java index c6a519b39ec..29db7bee682 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistry.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistry.java @@ -23,7 +23,14 @@ import io.confluent.kafka.schemaregistry.client.SchemaVersionFetcher; import io.confluent.kafka.schemaregistry.client.rest.entities.Schema; import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString; +import io.confluent.kafka.schemaregistry.client.rest.entities.SubjectVersion; +import io.confluent.kafka.schemaregistry.CompatibilityLevel; +import io.confluent.kafka.schemaregistry.exceptions.OperationNotPermittedException; import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryException; +import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryRequestForwardingException; +import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryStoreException; +import io.confluent.kafka.schemaregistry.exceptions.UnknownLeaderException; +import io.confluent.kafka.schemaregistry.metrics.MetricsContainer; import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig; import io.confluent.kafka.schemaregistry.utils.QualifiedSubject; @@ -53,6 +60,9 @@ Schema get(String subject, int version, boolean returnDeletedSchema) throws SchemaRegistryException; SchemaString get(int id, String subject) throws SchemaRegistryException; + + SchemaString get(int id, String subject, String format, boolean fetchMaxId) + throws SchemaRegistryException; default Set listSubjects() throws SchemaRegistryException { return listSubjects(false); @@ -60,7 +70,12 @@ default Set listSubjects() throws SchemaRegistryException { Set listSubjects(boolean returnDeletedSubjects) throws SchemaRegistryException; - + + Set listSubjectsForId(int id, String subject) throws SchemaRegistryException; + + Set listSubjectsForId(int id, String subject, boolean returnDeleted) + throws SchemaRegistryException; + Iterator getAllVersions(String subject, boolean returnDeletedSchemas) throws SchemaRegistryException; @@ -96,10 +111,88 @@ List isCompatible(String subject, void deleteSchemaVersion(String subject, Schema schema, boolean permanentDelete) throws SchemaRegistryException; + + void updateConfigOrForward(String subject, CompatibilityLevel newCompatibilityLevel, + Map headerProperties) + throws SchemaRegistryStoreException, SchemaRegistryRequestForwardingException, + UnknownLeaderException, OperationNotPermittedException; + + void updateCompatibilityLevel(String subject, CompatibilityLevel newCompatibilityLevel) + throws SchemaRegistryStoreException, OperationNotPermittedException, + UnknownLeaderException; + + CompatibilityLevel getCompatibilityLevel(String subject) + throws SchemaRegistryStoreException; + + CompatibilityLevel getCompatibilityLevelInScope(String subject) + throws SchemaRegistryStoreException; + + void deleteSubjectCompatibilityConfig(String subject) + throws SchemaRegistryStoreException, OperationNotPermittedException; + + void deleteSubjectCompatibilityConfigOrForward(String subject, + Map headerProperties) + throws SchemaRegistryStoreException, SchemaRegistryRequestForwardingException, + OperationNotPermittedException, UnknownLeaderException; + + boolean hasSubjects(String subject, + boolean lookupDeletedSubjects) + throws SchemaRegistryStoreException; + + + void deleteSubjectModeOrForward(String subject, Map headerProperties) + throws SchemaRegistryStoreException, SchemaRegistryRequestForwardingException, + OperationNotPermittedException, UnknownLeaderException; + + List deleteSubjectOrForward( + Map requestProperties, + String subject, + boolean permanentDelete) throws SchemaRegistryException; + + + + List getReferencedBy(String subject, VersionId versionId) + throws SchemaRegistryException; + + Schema validateAndGetSchema(String subject, VersionId versionId, boolean + returnDeletedSchema) throws SchemaRegistryException; + + int registerOrForward(String subject, + Schema schema, + Map headerProperties) + throws SchemaRegistryException; + + void setModeOrForward(String subject, Mode mode, Map headerProperties) + throws SchemaRegistryStoreException, SchemaRegistryRequestForwardingException, + OperationNotPermittedException, UnknownLeaderException; + + Mode getModeInScope(String subject) throws SchemaRegistryStoreException; + + Mode getMode(String subject) throws SchemaRegistryStoreException; + + String getKafkaClusterId(); + + boolean schemaVersionExists(String subject, VersionId versionId, boolean + returnDeletedSchema) throws SchemaRegistryException; + + List listVersionsForId(int id, String subject) + throws SchemaRegistryException; + + List listVersionsForId(int id, String subject, boolean lookupDeleted) + throws SchemaRegistryException; + + void deleteSchemaVersionOrForward( + Map headerProperties, String subject, + Schema schema, boolean permanentDelete) throws SchemaRegistryException; + + MetricsContainer getMetricsContainer(); // TODO maybe move to abstract class + default String tenant() { return DEFAULT_TENANT; } + + /** * Can be used by subclasses to implement multi-tenancy * diff --git a/findbugs/findbugs-exclude.xml b/findbugs/findbugs-exclude.xml index 7d98b8f52a2..6c0dde8cf2c 100644 --- a/findbugs/findbugs-exclude.xml +++ b/findbugs/findbugs-exclude.xml @@ -98,6 +98,16 @@ For a detailed description of findbugs bug categories, see http://findbugs.sourc + + + + + + + + + + diff --git a/json-schema-converter/src/main/java/io/confluent/connect/json/JsonSchemaConverter.java b/json-schema-converter/src/main/java/io/confluent/connect/json/JsonSchemaConverter.java index 28c840dfebc..43e10fc0b99 100644 --- a/json-schema-converter/src/main/java/io/confluent/connect/json/JsonSchemaConverter.java +++ b/json-schema-converter/src/main/java/io/confluent/connect/json/JsonSchemaConverter.java @@ -15,8 +15,9 @@ package io.confluent.connect.json; -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.annotations.VisibleForTesting; +import java.util.Collections; +import java.util.Map; + import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.common.errors.SerializationException; @@ -25,8 +26,8 @@ import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.storage.Converter; -import java.util.Collections; -import java.util.Map; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; From 58caae27ae07be52cb3e904ba2eedda8c577d3ad Mon Sep 17 00:00:00 2001 From: Sarwar Bhuiyan Date: Mon, 4 Apr 2022 16:12:19 +0100 Subject: [PATCH 2/2] Refactored code to be in line with 7.3.0 --- .../client/rest/entities/ErrorMessage.java | 4 +- .../client/rest/entities/ServerClusterId.java | 4 +- .../rest/SchemaRegistryRestApplication.java | 2 +- .../rest/resources/ConfigResource.java | 2 +- .../rest/resources/ContextsResource.java | 6 +-- .../rest/resources/ModeResource.java | 4 +- .../resources/ServerMetadataResource.java | 10 ++++- .../resources/SubjectVersionsResource.java | 2 +- .../storage/KafkaSchemaRegistry.java | 4 +- .../storage/SchemaRegistry.java | 41 ++++++++++++------- findbugs/findbugs-exclude.xml | 10 ----- 11 files changed, 49 insertions(+), 40 deletions(-) diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/ErrorMessage.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/ErrorMessage.java index b3d73411dc9..e42ba6711ec 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/ErrorMessage.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/ErrorMessage.java @@ -43,8 +43,8 @@ public int getErrorCode() { } @JsonProperty("error_code") - public void setErrorCode(int error_code) { - this.errorCode = error_code; + public void setErrorCode(int errorCode) { + this.errorCode = errorCode; } @JsonProperty diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/ServerClusterId.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/ServerClusterId.java index 59171e7e020..312055413e7 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/ServerClusterId.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/ServerClusterId.java @@ -51,7 +51,9 @@ public Map getScope() { public static ServerClusterId of(String kafkaClusterId, String schemaRegistryClusterId) { Map clusters = new HashMap<>(); - clusters.put(KAFKA_CLUSTER, kafkaClusterId); + if (kafkaClusterId != null) { + clusters.put(KAFKA_CLUSTER, kafkaClusterId); + } clusters.put(SCHEMA_REGISTRY_CLUSTER, schemaRegistryClusterId); Map serverClusterId = new HashMap<>(); serverClusterId.put("path", Collections.emptyList()); diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java index d69f6700545..5236d977609 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java @@ -67,7 +67,7 @@ public SchemaRegistryRestApplication(SchemaRegistryConfig config) { protected SchemaRegistry initSchemaRegistry(SchemaRegistryConfig config) { - KafkaSchemaRegistry kafkaSchemaRegistry = null; + SchemaRegistry kafkaSchemaRegistry = null; try { kafkaSchemaRegistry = new KafkaSchemaRegistry( config, diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ConfigResource.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ConfigResource.java index 0071628345b..19a22eda84d 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ConfigResource.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ConfigResource.java @@ -15,7 +15,7 @@ package io.confluent.kafka.schemaregistry.rest.resources; -import static io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.GLOBAL_RESOURCE_NAME; +import static io.confluent.kafka.schemaregistry.storage.SchemaRegistry.GLOBAL_RESOURCE_NAME; import com.google.common.base.CharMatcher; import io.confluent.kafka.schemaregistry.CompatibilityLevel; diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ContextsResource.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ContextsResource.java index e905276ffc9..3ee9838d172 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ContextsResource.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ContextsResource.java @@ -19,7 +19,7 @@ import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryException; import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryStoreException; import io.confluent.kafka.schemaregistry.rest.exceptions.Errors; -import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry; +import io.confluent.kafka.schemaregistry.storage.SchemaRegistry; import io.confluent.rest.annotations.PerformanceMetric; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.media.ArraySchema; @@ -42,9 +42,9 @@ Versions.JSON, Versions.GENERIC_REQUEST}) public class ContextsResource { - private final KafkaSchemaRegistry schemaRegistry; + private final SchemaRegistry schemaRegistry; - public ContextsResource(KafkaSchemaRegistry schemaRegistry) { + public ContextsResource(SchemaRegistry schemaRegistry) { this.schemaRegistry = schemaRegistry; } diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ModeResource.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ModeResource.java index 26911803234..06052658709 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ModeResource.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ModeResource.java @@ -15,9 +15,7 @@ package io.confluent.kafka.schemaregistry.rest.resources; -<<<<<<< HEAD -import static io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.GLOBAL_RESOURCE_NAME; - +import static io.confluent.kafka.schemaregistry.storage.SchemaRegistry.GLOBAL_RESOURCE_NAME; import com.google.common.base.CharMatcher; import io.confluent.kafka.schemaregistry.client.rest.Versions; import io.confluent.kafka.schemaregistry.client.rest.entities.Mode; diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ServerMetadataResource.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ServerMetadataResource.java index d06cd9296a6..5a6db6817c3 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ServerMetadataResource.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ServerMetadataResource.java @@ -17,6 +17,7 @@ import io.confluent.kafka.schemaregistry.client.rest.Versions; import io.confluent.kafka.schemaregistry.client.rest.entities.ServerClusterId; +import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry; import io.confluent.kafka.schemaregistry.storage.SchemaRegistry; import io.confluent.rest.annotations.PerformanceMetric; import io.swagger.v3.oas.annotations.Operation; @@ -43,7 +44,7 @@ public class ServerMetadataResource { private final SchemaRegistry schemaRegistry; public ServerMetadataResource(SchemaRegistry schemaRegistry) { - this.schemaRegistry = schemaRegistry; + this.schemaRegistry = schemaRegistry; } @GET @@ -54,8 +55,13 @@ public ServerMetadataResource(SchemaRegistry schemaRegistry) { }) @PerformanceMetric("metadata.id") public ServerClusterId getClusterId() { - String kafkaClusterId = schemaRegistry.getKafkaClusterId(); String schemaRegistryClusterId = schemaRegistry.getGroupId(); + + String kafkaClusterId = null; + if (schemaRegistry instanceof KafkaSchemaRegistry) { + kafkaClusterId = ((KafkaSchemaRegistry)schemaRegistry).getKafkaClusterId(); + } + return ServerClusterId.of(kafkaClusterId, schemaRegistryClusterId); } } diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectVersionsResource.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectVersionsResource.java index 10b6bee0a22..bbb94a3b344 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectVersionsResource.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectVersionsResource.java @@ -15,7 +15,7 @@ package io.confluent.kafka.schemaregistry.rest.resources; -import static io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.GLOBAL_RESOURCE_NAME; +import static io.confluent.kafka.schemaregistry.storage.SchemaRegistry.GLOBAL_RESOURCE_NAME; import com.google.common.base.CharMatcher; import io.confluent.kafka.schemaregistry.avro.AvroSchema; diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java index a253b1bc617..f7b95510e3e 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java @@ -104,8 +104,6 @@ public class KafkaSchemaRegistry implements SchemaRegistry, LeaderAwareSchemaReg * Schema versions under a particular subject are indexed from MIN_VERSION. */ public static final int MIN_VERSION = 1; - // Subject name under which global permissions are stored. - public static final String GLOBAL_RESOURCE_NAME = "__GLOBAL"; public static final int MAX_VERSION = Integer.MAX_VALUE; private static final Logger log = LoggerFactory.getLogger(KafkaSchemaRegistry.class); @@ -588,6 +586,7 @@ private boolean isReadOnlyMode(String subject) throws SchemaRegistryStoreExcepti return subjectMode == Mode.READONLY || subjectMode == Mode.READONLY_OVERRIDE; } + @Override public int registerOrForward(String subject, Schema schema, boolean normalize, @@ -1711,6 +1710,7 @@ public void setMode(String subject, Mode mode, boolean force) } } + @Override public void setModeOrForward(String subject, Mode mode, boolean force, Map headerProperties) throws SchemaRegistryStoreException, SchemaRegistryRequestForwardingException, diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistry.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistry.java index 29db7bee682..84c07e14631 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistry.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistry.java @@ -32,11 +32,15 @@ import io.confluent.kafka.schemaregistry.exceptions.UnknownLeaderException; import io.confluent.kafka.schemaregistry.metrics.MetricsContainer; import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig; +import io.confluent.kafka.schemaregistry.rest.VersionId; import io.confluent.kafka.schemaregistry.utils.QualifiedSubject; public interface SchemaRegistry extends SchemaVersionFetcher { String DEFAULT_TENANT = QualifiedSubject.DEFAULT_TENANT; + + // Subject name under which global permissions are stored. + public static final String GLOBAL_RESOURCE_NAME = "__GLOBAL"; void init() throws SchemaRegistryException; @@ -127,13 +131,10 @@ CompatibilityLevel getCompatibilityLevel(String subject) CompatibilityLevel getCompatibilityLevelInScope(String subject) throws SchemaRegistryStoreException; - void deleteSubjectCompatibilityConfig(String subject) - throws SchemaRegistryStoreException, OperationNotPermittedException; - - void deleteSubjectCompatibilityConfigOrForward(String subject, - Map headerProperties) - throws SchemaRegistryStoreException, SchemaRegistryRequestForwardingException, - OperationNotPermittedException, UnknownLeaderException; + public void deleteCompatibilityConfigOrForward(String subject, + Map headerProperties) + throws SchemaRegistryStoreException, SchemaRegistryRequestForwardingException, + OperationNotPermittedException, UnknownLeaderException; boolean hasSubjects(String subject, boolean lookupDeletedSubjects) @@ -154,23 +155,33 @@ List deleteSubjectOrForward( List getReferencedBy(String subject, VersionId versionId) throws SchemaRegistryException; - Schema validateAndGetSchema(String subject, VersionId versionId, boolean - returnDeletedSchema) throws SchemaRegistryException; + public List listContexts() throws SchemaRegistryException; - int registerOrForward(String subject, + public Schema lookUpSchemaUnderSubjectUsingContexts( + String subject, Schema schema, boolean normalize, boolean lookupDeletedSchema) + throws SchemaRegistryException; + + public Set listSubjectsWithPrefix(String prefix, boolean returnDeletedSubjects) + throws SchemaRegistryException; + + public String getGroupId(); + + public int registerOrForward(String subject, Schema schema, + boolean normalize, Map headerProperties) - throws SchemaRegistryException; + throws SchemaRegistryException; - void setModeOrForward(String subject, Mode mode, Map headerProperties) + public void setModeOrForward(String subject, Mode mode, boolean force, + Map headerProperties) throws SchemaRegistryStoreException, SchemaRegistryRequestForwardingException, - OperationNotPermittedException, UnknownLeaderException; + OperationNotPermittedException, UnknownLeaderException; Mode getModeInScope(String subject) throws SchemaRegistryStoreException; Mode getMode(String subject) throws SchemaRegistryStoreException; - String getKafkaClusterId(); + //String getKafkaClusterId(); boolean schemaVersionExists(String subject, VersionId versionId, boolean returnDeletedSchema) throws SchemaRegistryException; @@ -191,6 +202,8 @@ default String tenant() { return DEFAULT_TENANT; } + public Schema getUsingContexts(String subject, int version, boolean + returnDeletedSchema) throws SchemaRegistryException; /** diff --git a/findbugs/findbugs-exclude.xml b/findbugs/findbugs-exclude.xml index 6c0dde8cf2c..7d98b8f52a2 100644 --- a/findbugs/findbugs-exclude.xml +++ b/findbugs/findbugs-exclude.xml @@ -98,16 +98,6 @@ For a detailed description of findbugs bug categories, see http://findbugs.sourc - - - - - - - - - -