Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local schema cache weighted invalidation based on schema size #2598

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public class MetricsContainer {
public static final String METRIC_NAME_JSON_SCHEMAS_DELETED = "json-schemas-deleted";
public static final String METRIC_NAME_PB_SCHEMAS_CREATED = "protobuf-schemas-created";
public static final String METRIC_NAME_PB_SCHEMAS_DELETED = "protobuf-schemas-deleted";
public static final String METRIC_NAME_SCHEMA_CACHE_SIZE = "schema-cache-size";
public static final String METRIC_NAME_SCHEMA_CACHE_HIT = "schema-cache-hit";
public static final String METRIC_NAME_SCHEMA_CACHE_MISS = "schema-cache-miss";

private final Metrics metrics;
private final Map<String, String> configuredTags;
Expand All @@ -86,6 +89,9 @@ public class MetricsContainer {
private final SchemaRegistryMetric avroSchemasDeleted;
private final SchemaRegistryMetric jsonSchemasDeleted;
private final SchemaRegistryMetric protobufSchemasDeleted;
private final SchemaRegistryMetric schemaCacheHit;
private final SchemaRegistryMetric schemaCacheMiss;
private final SchemaRegistryMetric schemaCacheSize;

private final MetricsContext metricsContext;

Expand Down Expand Up @@ -146,6 +152,12 @@ public MetricsContainer(SchemaRegistryConfig config, String kafkaClusterId) {

this.protobufSchemasDeleted = createMetric(METRIC_NAME_PB_SCHEMAS_DELETED,
"Number of deleted Protobuf schemas", new CumulativeCount());

this.schemaCacheHit = createMetric("schema-cache-hit", "Number of times the local schema cache has been hit");

this.schemaCacheMiss = createMetric("schema-cache-miss", "Number of times the local schema cache has been missed");

this.schemaCacheSize = createMetric("schema-cache-size", "Size of the local schema cache");
}

public Metrics getMetrics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,18 @@ public class SchemaRegistryConfig extends RestConfig {
public static final String COMPATIBILITY_CONFIG = "avro.compatibility.level";
public static final String SCHEMA_COMPATIBILITY_CONFIG = "schema.compatibility.level";

/**
* <code>schema.cache.maximum.weight</code>
*/
public static final String SCHEMA_CACHE_MAXIMUM_WEIGHT_CONFIG = "schema.cache.maximum.weight";
public static final int SCHEMA_CACHE_MAXIMUM_WEIGHT_DEFAULT = 1000000;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sum of canonical schema length must be under 1,000,000 characters is what this variable means. From the guava cache docs there's a lot of fuzziness as to how things get invalidated within the cache itself (based on usage and % of contribution to weight total?) but we've set this limit such that large schemas should be getting invalidated over time. What this essentially means is that more smaller schemas will be cached (and that's good!) as we should be able to cache quite a few of them. No significant performance degradation has materialized from making this change.


/**
* <code>schema.cache.use.weight</code>
*/
public static final String SCHEMA_CACHE_USE_WEIGHT_CONFIG = "schema.cache.use.weight";
public static final int SCHEMA_CACHE_USE_WEIGHT_DEFAULT = false;

/**
* <code>schema.cache.size</code>
*/
Expand Down Expand Up @@ -322,6 +334,10 @@ public class SchemaRegistryConfig extends RestConfig {
"The expiration in seconds for entries accessed in the cache.";
protected static final String SCHEMA_CANONICALIZE_ON_CONSUME_DOC =
"A list of schema types to canonicalize on consume, to be used if canonicalization changes.";
protected static final String SCHEMA_CACHE_MAXIMUM_WEIGHT_DOC =
"The maximum weight of the schema cache.";
protected static final String SCHEMA_CACHE_USE_WEIGHT_DOC =
"Whether to use weight or size based local schema cache.";
protected static final String METADATA_ENCODER_SECRET_DOC =
"The secret used to encrypt and decrypt encoder keysets. "
+ "Use a random string with high entropy.";
Expand Down Expand Up @@ -513,6 +529,14 @@ DEFAULT_KAFKASTORE_WRITE_MAX_RETRIES, atLeast(0),
.define(SCHEMA_CANONICALIZE_ON_CONSUME_CONFIG, ConfigDef.Type.LIST, "",
ConfigDef.Importance.LOW, SCHEMA_CANONICALIZE_ON_CONSUME_DOC
)
.define(SCHEMA_CACHE_MAXIMUM_WEIGHT_CONFIG,
ConfigDef.Type.INT, SCHEMA_CACHE_MAXIMUM_WEIGHT_DEFAULT,
ConfigDef.Importance.LOW, SCHEMA_CACHE_MAXIMUM_WEIGHT_DOC
)
.define(SCHEMA_CACHE_USE_WEIGHT_CONFIG,
ConfigDef.Type.BOOLEAN, SCHEMA_CACHE_USE_WEIGHT_DEFAULT,
ConfigDef.Importance.LOW, SCHEMA_CACHE_USE_WEIGHT_DOC
)
.define(METADATA_ENCODER_SECRET_CONFIG, ConfigDef.Type.PASSWORD, null,
ConfigDef.Importance.HIGH, METADATA_ENCODER_SECRET_DOC
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import io.confluent.rest.RestConfig;
import io.confluent.rest.exceptions.RestException;
import io.confluent.rest.NamedURI;
import com.google.common.cache.Weigher;

import java.util.Map;
import java.util.List;
Expand Down Expand Up @@ -188,21 +189,36 @@ public KafkaSchemaRegistry(SchemaRegistryConfig config,
this.groupId = config.getString(SchemaRegistryConfig.SCHEMAREGISTRY_GROUP_ID_CONFIG);
this.metricsContainer = new MetricsContainer(config, this.kafkaClusterId);
this.providers = initProviders(config);
this.schemaCache = CacheBuilder.newBuilder()
.maximumSize(config.getInt(SchemaRegistryConfig.SCHEMA_CACHE_SIZE_CONFIG))
.expireAfterAccess(
config.getInt(SchemaRegistryConfig.SCHEMA_CACHE_EXPIRY_SECS_CONFIG), TimeUnit.SECONDS)
this.schemaCache = this.buildLocalSchemaCache();
this.lookupCache = lookupCache();
this.idGenerator = identityGenerator(config);
this.kafkaStore = kafkaStore(config);
this.metadataEncoder = new MetadataEncoderService(this);
this.ruleSetHandler = new RuleSetHandler();
}

private Cache buildLocalSchemaCache() {
if (config.getBoolean(SchemaRegistryConfig.SCHEMA_CACHE_USE_WEIGHT_CONFIG)) {
return CacheBuilder.newBuilder()
.maximumWeight(config.getInt(SchemaRegistryConfig.SCHEMA_CACHE_MAXIMUM_WEIGHT_CONFIG))
.weigher((Weigher<RawSchema, ParsedSchema>) (k, v) -> v.canonicalString().length())
.expireAfterAccess(config.getInt(SchemaRegistryConfig.SCHEMA_CACHE_EXPIRY_SECS_CONFIG), TimeUnit.SECONDS)
.build(new CacheLoader<RawSchema, ParsedSchema>() {
@Override
public ParsedSchema load(RawSchema s) throws Exception {
return loadSchema(s.getSchema(), s.isNew(), s.isNormalize());
}
});
this.lookupCache = lookupCache();
this.idGenerator = identityGenerator(config);
this.kafkaStore = kafkaStore(config);
this.metadataEncoder = new MetadataEncoderService(this);
this.ruleSetHandler = new RuleSetHandler();
}
return CacheBuilder.newBuilder()
.maximumSize(config.getInt(SchemaRegistryConfig.SCHEMA_CACHE_SIZE_CONFIG))
.expireAfterAccess(config.getInt(SchemaRegistryConfig.SCHEMA_CACHE_EXPIRY_SECS_CONFIG), TimeUnit.SECONDS)
.build(new CacheLoader<RawSchema, ParsedSchema>() {
@Override
public ParsedSchema load(RawSchema s) throws Exception {
return loadSchema(s.getSchema(), s.isNew(), s.isNormalize());
}
});
}

private Map<String, SchemaProvider> initProviders(SchemaRegistryConfig config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import static io.confluent.kafka.schemaregistry.metrics.MetricsContainer.METRIC_NAME_DELETED_COUNT;
import static io.confluent.kafka.schemaregistry.metrics.MetricsContainer.METRIC_NAME_MASTER_SLAVE_ROLE;
import static io.confluent.kafka.schemaregistry.metrics.MetricsContainer.METRIC_NAME_REGISTERED_COUNT;
import static io.confluent.kafka.schemaregistry.metrics.MetricsContainer.METRIC_NAME_SCHEMA_CACHE_SIZE;
import static io.confluent.kafka.schemaregistry.metrics.MetricsContainer.METRIC_NAME_SCHEMA_CACHE_HIT;
import static io.confluent.kafka.schemaregistry.metrics.MetricsContainer.METRIC_NAME_SCHEMA_CACHE_MISS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -96,6 +99,45 @@ public void testSchemaCreatedCount() throws Exception {
assertEquals((double) schemaCount, mBeanServer.getAttribute(avroDeleted, METRIC_NAME_AVRO_SCHEMAS_DELETED));
}

@Test
public void testLocalSchemaCacheMetrics() throws Exception {
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName schemaCacheSize =
new ObjectName("kafka.schema.registry:type=" + METRIC_NAME_SCHEMA_CACHE_SIZE);
ObjectName schemaCacheHit =
new ObjectName("kafka.schema.registry:type=" + METRIC_NAME_SCHEMA_CACHE_HIT);
ObjectName schemaCacheMiss =
new ObjectName("kafka.schema.registry:type=" + METRIC_NAME_SCHEMA_CACHE_MISS);

RestService service = restApp.restClient;
String subject = "testTopic1";
int schemaCount = 3;
List<String> schemas = TestUtils.getRandomCanonicalAvroString(schemaCount);

// test registering and verifying new schemas in subject1
int schemaIdCounter = 1;
for (int i = 0; i < schemaCount; i++) {
String schema = schemas.get(i);
TestUtils.registerAndVerifySchema(service, schema, schemaIdCounter++, subject);
}

// Re-registering schemas should only increase cache hits.
for (int i = 0; i < schemaCount; i++) {
String schemaString = schemas.get(i);
service.registerSchema(schemaString, subject);
}

// Deleting schemas should only increase cache hits.
for (Integer i = 1; i < schemaIdCounter; i++) {
assertEquals(i, service.deleteSchemaVersion(RestService.DEFAULT_REQUEST_PROPERTIES,
subject, i.toString()));
}

assertEquals((double) schemaCount, mBeanServer.getAttribute(schemaCacheSize, METRIC_NAME_REGISTERED_COUNT));
assertEquals((double) schemaCount, mBeanServer.getAttribute(schemaCacheMiss, METRIC_NAME_AVRO_SCHEMAS_CREATED));
assertEquals((double) schemaCount * 3, mBeanServer.getAttribute(schemaCacheHit, METRIC_NAME_DELETED_COUNT));
}

@Test
public void testApiCallMetrics() throws Exception {
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
Expand Down