Skip to content

Commit

Permalink
pagination for subjects
Browse files Browse the repository at this point in the history
  • Loading branch information
djibodu committed Nov 29, 2024
1 parent 345fed4 commit 5e84b2f
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,17 @@ public class SchemaRegistryConfig extends RestConfig {
public static final String SCHEMA_SEARCH_MAX_LIMIT_CONFIG = "schema.search.max.limit";
public static final int SCHEMA_SEARCH_MAX_LIMIT_DEFAULT = 1000;

/**
* <code>subject.search.default.limit</code>
*/
public static final String SUBJECT_SEARCH_DEFAULT_LIMIT_CONFIG = "subject.search.default.limit";
public static final int SUBJECT_SEARCH_DEFAULT_LIMIT_DEFAULT = 20000;
/**
* <code>subject.search.max.limit</code>
*/
public static final String SUBJECT_SEARCH_MAX_LIMIT_CONFIG = "subject.search.max.limit";
public static final int SUBJECT_SEARCH_MAX_LIMIT_DEFAULT = 20000;

public static final String METADATA_ENCODER_SECRET_CONFIG = "metadata.encoder.secret";
public static final String METADATA_ENCODER_OLD_SECRET_CONFIG = "metadata.encoder.old.secret";

Expand Down Expand Up @@ -342,6 +353,10 @@ public class SchemaRegistryConfig extends RestConfig {
"The default limit for schema searches.";
protected static final String SCHEMA_SEARCH_MAX_LIMIT_DOC =
"The max limit for schema searches.";
protected static final String SUBJECT_SEARCH_DEFAULT_LIMIT_DOC =
"The default limit for subject searches.";
protected static final String SUBJECT_SEARCH_MAX_LIMIT_DOC =
"The max limit for subject searches.";
protected static final String METADATA_ENCODER_SECRET_DOC =
"The secret used to encrypt and decrypt encoder keysets. "
+ "Use a random string with high entropy.";
Expand Down Expand Up @@ -544,6 +559,14 @@ DEFAULT_KAFKASTORE_WRITE_MAX_RETRIES, atLeast(0),
SCHEMA_SEARCH_MAX_LIMIT_DEFAULT,
ConfigDef.Importance.LOW, SCHEMA_SEARCH_MAX_LIMIT_DOC
)
.define(SUBJECT_SEARCH_DEFAULT_LIMIT_CONFIG, ConfigDef.Type.INT,
SUBJECT_SEARCH_DEFAULT_LIMIT_DEFAULT,
ConfigDef.Importance.LOW, SUBJECT_SEARCH_DEFAULT_LIMIT_DOC
)
.define(SUBJECT_SEARCH_MAX_LIMIT_CONFIG, ConfigDef.Type.INT,
SUBJECT_SEARCH_MAX_LIMIT_DEFAULT,
ConfigDef.Importance.LOW, SUBJECT_SEARCH_MAX_LIMIT_DOC
)
.define(METADATA_ENCODER_SECRET_CONFIG, ConfigDef.Type.PASSWORD, null,
ConfigDef.Importance.HIGH, METADATA_ENCODER_SECRET_DOC
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public List<Schema> getSchemas(
} catch (SchemaRegistryException e) {
throw Errors.schemaRegistryException(errorMessage, e);
}
limit = schemaRegistry.normalizeLimit(limit);
limit = schemaRegistry.normalizeSchemaLimit(limit);
int toIndex = offset + limit;
int index = 0;
while (schemas.hasNext() && index < toIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.swagger.v3.oas.annotations.tags.Tag;
import io.swagger.v3.oas.annotations.tags.Tags;
import java.util.HashMap;
import java.util.LinkedHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -61,6 +62,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Path("/subjects")
@Produces({Versions.SCHEMA_REGISTRY_V1_JSON_WEIGHTED,
Expand Down Expand Up @@ -229,6 +232,10 @@ public Set<String> list(
@DefaultValue(QualifiedSubject.CONTEXT_WILDCARD)
@Parameter(description = "Subject name prefix")
@QueryParam("subjectPrefix") String subjectPrefix,
@Parameter(description = "Pagination offset for results")
@DefaultValue("0") @QueryParam("offset") int offset,
@Parameter(description = "Pagination size for results. Ignored if negative")
@DefaultValue("-1") @QueryParam("limit") int limit,
@Parameter(description = "Whether to look up deleted subjects")
@QueryParam("deleted") boolean lookupDeletedSubjects,
@Parameter(description = "Whether to return deleted subjects only")
Expand All @@ -242,8 +249,15 @@ public Set<String> list(
filter = LookupFilter.INCLUDE_DELETED;
}
try {
return schemaRegistry.listSubjectsWithPrefix(
Set<String> subjects = schemaRegistry.listSubjectsWithPrefix(
subjectPrefix != null ? subjectPrefix : QualifiedSubject.CONTEXT_WILDCARD, filter);
Stream<String> stream = subjects.stream();

limit = schemaRegistry.normalizeSubjectLimit(limit);

stream = stream.skip(offset).limit(limit);
return stream.collect(Collectors.toCollection(LinkedHashSet::new)); // preserve order

} catch (SchemaRegistryStoreException e) {
throw Errors.storeException("Error while listing subjects", e);
} catch (SchemaRegistryException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,10 @@ public class KafkaSchemaRegistry implements SchemaRegistry, LeaderAwareSchemaReg
private final int kafkaStoreTimeoutMs;
private final int initTimeout;
private final int kafkaStoreMaxRetries;
private final int searchDefaultLimit;
private final int searchMaxLimit;
private final int schemaSearchDefaultLimit;
private final int schemaSearchMaxLimit;
private final int subjectSearchDefaultLimit;
private final int subjectSearchMaxLimit;
private final boolean isEligibleForLeaderElector;
private final boolean delayLeaderElection;
private final boolean allowModeChanges;
Expand Down Expand Up @@ -218,9 +220,11 @@ public ParsedSchema load(RawSchema s) throws Exception {
return loadSchema(s.getSchema(), s.isNew(), s.isNormalize());
}
});
this.searchDefaultLimit =
config.getInt(SchemaRegistryConfig.SCHEMA_SEARCH_DEFAULT_LIMIT_CONFIG);
this.searchMaxLimit = config.getInt(SchemaRegistryConfig.SCHEMA_SEARCH_MAX_LIMIT_CONFIG);
this.schemaSearchDefaultLimit =
config.getInt(SchemaRegistryConfig.SCHEMA_SEARCH_DEFAULT_LIMIT_CONFIG);
this.schemaSearchMaxLimit = config.getInt(SchemaRegistryConfig.SCHEMA_SEARCH_MAX_LIMIT_CONFIG);
this.subjectSearchDefaultLimit = config.getInt(SchemaRegistryConfig.SUBJECT_SEARCH_DEFAULT_LIMIT_CONFIG);
this.subjectSearchMaxLimit = config.getInt(SchemaRegistryConfig.SUBJECT_SEARCH_MAX_LIMIT_CONFIG);
this.lookupCache = lookupCache();
this.idGenerator = identityGenerator(config);
this.kafkaStore = kafkaStore(config);
Expand Down Expand Up @@ -569,9 +573,17 @@ public SchemaProvider schemaProvider(String schemaType) {
return providers.get(schemaType);
}

public int normalizeLimit(int suppliedLimit) {
int limit = searchDefaultLimit;
if (suppliedLimit > 0 && suppliedLimit <= searchMaxLimit) {
public int normalizeSchemaLimit(int suppliedLimit) {
int limit = schemaSearchDefaultLimit;
if (suppliedLimit > 0 && suppliedLimit <= schemaSearchMaxLimit) {
limit = suppliedLimit;
}
return limit;
}

public int normalizeSubjectLimit(int suppliedLimit) {
int limit = subjectSearchDefaultLimit;
if (suppliedLimit > 0 && suppliedLimit <= subjectSearchMaxLimit) {
limit = suppliedLimit;
}
return limit;
Expand Down

0 comments on commit 5e84b2f

Please sign in to comment.