Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka store abstraction #2226

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ target/
# Log file from schema registry daemon
logs/
nohup.out
/.metadata/
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public int getErrorCode() {
}

@JsonProperty("error_code")
public void setErrorCode(int error_code) {
this.errorCode = error_code;
public void setErrorCode(int errorCode) {
this.errorCode = errorCode;
}

@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ public Map<String, Object> getScope() {

public static ServerClusterId of(String kafkaClusterId, String schemaRegistryClusterId) {
Map<String, Object> clusters = new HashMap<>();
clusters.put(KAFKA_CLUSTER, kafkaClusterId);
if (kafkaClusterId != null) {
clusters.put(KAFKA_CLUSTER, kafkaClusterId);
}
clusters.put(SCHEMA_REGISTRY_CLUSTER, schemaRegistryClusterId);
Map<String, Object> serverClusterId = new HashMap<>();
serverClusterId.put("path", Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource;
import io.confluent.kafka.schemaregistry.rest.resources.SubjectsResource;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.serialization.SchemaRegistrySerializer;
import io.confluent.rest.Application;
import io.confluent.rest.RestConfigException;
Expand All @@ -47,7 +48,7 @@
public class SchemaRegistryRestApplication extends Application<SchemaRegistryConfig> {

private static final Logger log = LoggerFactory.getLogger(SchemaRegistryRestApplication.class);
private KafkaSchemaRegistry schemaRegistry = null;
private SchemaRegistry schemaRegistry = null;
private List<SchemaRegistryResourceExtension> schemaRegistryResourceExtensions = null;

public SchemaRegistryRestApplication(Properties props) throws RestConfigException {
Expand All @@ -65,8 +66,8 @@ public SchemaRegistryRestApplication(SchemaRegistryConfig config) {
}


protected KafkaSchemaRegistry initSchemaRegistry(SchemaRegistryConfig config) {
KafkaSchemaRegistry kafkaSchemaRegistry = null;
protected SchemaRegistry initSchemaRegistry(SchemaRegistryConfig config) {
SchemaRegistry kafkaSchemaRegistry = null;
try {
kafkaSchemaRegistry = new KafkaSchemaRegistry(
config,
Expand Down Expand Up @@ -158,6 +159,6 @@ public void onShutdown() {

// for testing purpose only
public KafkaSchemaRegistry schemaRegistry() {
return schemaRegistry;
return (KafkaSchemaRegistry)schemaRegistry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryStoreException;
import io.confluent.kafka.schemaregistry.rest.VersionId;
import io.confluent.kafka.schemaregistry.rest.exceptions.Errors;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import io.confluent.kafka.schemaregistry.utils.QualifiedSubject;
import io.confluent.rest.annotations.PerformanceMetric;
import io.swagger.v3.oas.annotations.Operation;
Expand Down Expand Up @@ -59,9 +59,9 @@
public class CompatibilityResource {

private static final Logger log = LoggerFactory.getLogger(CompatibilityResource.class);
private final KafkaSchemaRegistry schemaRegistry;
private final SchemaRegistry schemaRegistry;

public CompatibilityResource(KafkaSchemaRegistry schemaRegistry) {
public CompatibilityResource(SchemaRegistry schemaRegistry) {
this.schemaRegistry = schemaRegistry;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

package io.confluent.kafka.schemaregistry.rest.resources;

import static io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.GLOBAL_RESOURCE_NAME;
import static io.confluent.kafka.schemaregistry.storage.SchemaRegistry.GLOBAL_RESOURCE_NAME;

import com.google.common.base.CharMatcher;
import io.confluent.kafka.schemaregistry.CompatibilityLevel;
Expand All @@ -28,7 +28,7 @@
import io.confluent.kafka.schemaregistry.exceptions.UnknownLeaderException;
import io.confluent.kafka.schemaregistry.rest.exceptions.Errors;
import io.confluent.kafka.schemaregistry.rest.exceptions.RestInvalidCompatibilityException;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import io.confluent.kafka.schemaregistry.utils.QualifiedSubject;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
Expand Down Expand Up @@ -63,11 +63,11 @@
public class ConfigResource {

private static final Logger log = LoggerFactory.getLogger(ConfigResource.class);
private final KafkaSchemaRegistry schemaRegistry;
private final SchemaRegistry schemaRegistry;

private final RequestHeaderBuilder requestHeaderBuilder = new RequestHeaderBuilder();

public ConfigResource(KafkaSchemaRegistry schemaRegistry) {
public ConfigResource(SchemaRegistry schemaRegistry) {
this.schemaRegistry = schemaRegistry;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryStoreException;
import io.confluent.kafka.schemaregistry.rest.exceptions.Errors;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import io.confluent.rest.annotations.PerformanceMetric;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.media.ArraySchema;
Expand All @@ -42,9 +42,9 @@
Versions.JSON, Versions.GENERIC_REQUEST})
public class ContextsResource {

private final KafkaSchemaRegistry schemaRegistry;
private final SchemaRegistry schemaRegistry;

public ContextsResource(KafkaSchemaRegistry schemaRegistry) {
public ContextsResource(SchemaRegistry schemaRegistry) {
this.schemaRegistry = schemaRegistry;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@

package io.confluent.kafka.schemaregistry.rest.resources;

import static io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.GLOBAL_RESOURCE_NAME;

import static io.confluent.kafka.schemaregistry.storage.SchemaRegistry.GLOBAL_RESOURCE_NAME;
import com.google.common.base.CharMatcher;
import io.confluent.kafka.schemaregistry.client.rest.Versions;
import io.confluent.kafka.schemaregistry.client.rest.entities.Mode;
Expand All @@ -28,7 +27,7 @@
import io.confluent.kafka.schemaregistry.exceptions.UnknownLeaderException;
import io.confluent.kafka.schemaregistry.rest.exceptions.Errors;
import io.confluent.kafka.schemaregistry.rest.exceptions.RestInvalidModeException;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import io.confluent.kafka.schemaregistry.utils.QualifiedSubject;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
Expand Down Expand Up @@ -64,11 +63,11 @@
public class ModeResource {

private static final Logger log = LoggerFactory.getLogger(ModeResource.class);
private final KafkaSchemaRegistry schemaRegistry;
private final SchemaRegistry schemaRegistry;

private final RequestHeaderBuilder requestHeaderBuilder = new RequestHeaderBuilder();

public ModeResource(KafkaSchemaRegistry schemaRegistry) {
public ModeResource(SchemaRegistry schemaRegistry) {
this.schemaRegistry = schemaRegistry;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryStoreException;
import io.confluent.kafka.schemaregistry.rest.exceptions.Errors;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import io.confluent.rest.annotations.PerformanceMetric;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
Expand Down Expand Up @@ -54,9 +54,9 @@
public class SchemasResource {

private static final Logger log = LoggerFactory.getLogger(SchemasResource.class);
private final KafkaSchemaRegistry schemaRegistry;
private final SchemaRegistry schemaRegistry;

public SchemasResource(KafkaSchemaRegistry schemaRegistry) {
public SchemasResource(SchemaRegistry schemaRegistry) {
this.schemaRegistry = schemaRegistry;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.confluent.kafka.schemaregistry.client.rest.Versions;
import io.confluent.kafka.schemaregistry.client.rest.entities.ServerClusterId;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import io.confluent.rest.annotations.PerformanceMetric;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
Expand All @@ -39,9 +40,10 @@
public class ServerMetadataResource {

private static final Logger log = LoggerFactory.getLogger(ServerMetadataResource.class);
private final KafkaSchemaRegistry schemaRegistry;

public ServerMetadataResource(KafkaSchemaRegistry schemaRegistry) {
private final SchemaRegistry schemaRegistry;

public ServerMetadataResource(SchemaRegistry schemaRegistry) {
this.schemaRegistry = schemaRegistry;
}

Expand All @@ -53,8 +55,13 @@ public ServerMetadataResource(KafkaSchemaRegistry schemaRegistry) {
})
@PerformanceMetric("metadata.id")
public ServerClusterId getClusterId() {
String kafkaClusterId = schemaRegistry.getKafkaClusterId();
String schemaRegistryClusterId = schemaRegistry.getGroupId();

String kafkaClusterId = null;
if (schemaRegistry instanceof KafkaSchemaRegistry) {
kafkaClusterId = ((KafkaSchemaRegistry)schemaRegistry).getKafkaClusterId();
}

return ServerClusterId.of(kafkaClusterId, schemaRegistryClusterId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

package io.confluent.kafka.schemaregistry.rest.resources;

import static io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.GLOBAL_RESOURCE_NAME;
import static io.confluent.kafka.schemaregistry.storage.SchemaRegistry.GLOBAL_RESOURCE_NAME;

import com.google.common.base.CharMatcher;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
Expand All @@ -39,7 +39,7 @@
import io.confluent.kafka.schemaregistry.exceptions.UnknownLeaderException;
import io.confluent.kafka.schemaregistry.rest.VersionId;
import io.confluent.kafka.schemaregistry.rest.exceptions.Errors;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import io.confluent.kafka.schemaregistry.utils.QualifiedSubject;
import io.confluent.rest.annotations.PerformanceMetric;
import io.swagger.v3.oas.annotations.Operation;
Expand Down Expand Up @@ -79,7 +79,7 @@ public class SubjectVersionsResource {

private static final Logger log = LoggerFactory.getLogger(SubjectVersionsResource.class);

private final KafkaSchemaRegistry schemaRegistry;
private final SchemaRegistry schemaRegistry;

private final RequestHeaderBuilder requestHeaderBuilder = new RequestHeaderBuilder();

Expand All @@ -88,7 +88,7 @@ public class SubjectVersionsResource {
+ "returns the last registered schema under the specified subject. Note that there may be a "
+ "new latest schema that gets registered right after this request is served.";

public SubjectVersionsResource(KafkaSchemaRegistry registry) {
public SubjectVersionsResource(SchemaRegistry registry) {
this.schemaRegistry = registry;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryTimeoutException;
import io.confluent.kafka.schemaregistry.exceptions.SubjectNotSoftDeletedException;
import io.confluent.kafka.schemaregistry.rest.exceptions.Errors;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import io.confluent.kafka.schemaregistry.utils.QualifiedSubject;
import io.confluent.rest.annotations.PerformanceMetric;
import io.swagger.v3.oas.annotations.Operation;
Expand Down Expand Up @@ -67,10 +67,10 @@
public class SubjectsResource {

private static final Logger log = LoggerFactory.getLogger(SubjectsResource.class);
private final KafkaSchemaRegistry schemaRegistry;
private final SchemaRegistry schemaRegistry;
private final RequestHeaderBuilder requestHeaderBuilder = new RequestHeaderBuilder();

public SubjectsResource(KafkaSchemaRegistry schemaRegistry) {
public SubjectsResource(SchemaRegistry schemaRegistry) {
this.schemaRegistry = schemaRegistry;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ public class KafkaSchemaRegistry implements SchemaRegistry, LeaderAwareSchemaReg
* Schema versions under a particular subject are indexed from MIN_VERSION.
*/
public static final int MIN_VERSION = 1;
// Subject name under which global permissions are stored.
public static final String GLOBAL_RESOURCE_NAME = "__GLOBAL";
public static final int MAX_VERSION = Integer.MAX_VALUE;
private static final Logger log = LoggerFactory.getLogger(KafkaSchemaRegistry.class);

Expand Down Expand Up @@ -588,6 +586,7 @@ private boolean isReadOnlyMode(String subject) throws SchemaRegistryStoreExcepti
return subjectMode == Mode.READONLY || subjectMode == Mode.READONLY_OVERRIDE;
}

@Override
public int registerOrForward(String subject,
Schema schema,
boolean normalize,
Expand Down Expand Up @@ -1711,6 +1710,7 @@ public void setMode(String subject, Mode mode, boolean force)
}
}

@Override
public void setModeOrForward(String subject, Mode mode, boolean force,
Map<String, String> headerProperties)
throws SchemaRegistryStoreException, SchemaRegistryRequestForwardingException,
Expand Down
Loading