Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DGS-11414: add pagination for get all subjects #3455

Merged
merged 5 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1287,6 +1287,19 @@ public List<String> getAllSubjects(Map<String, String> requestProperties,
return response;
}

public List<String> getAllSubjectsWithPagination(int limit)
djibodu marked this conversation as resolved.
Show resolved Hide resolved
throws IOException, RestClientException {
return getAllSubjectsWithPagination(DEFAULT_REQUEST_PROPERTIES, limit);
}

public List<String> getAllSubjectsWithPagination(Map<String, String> requestProperties, int limit)
throws IOException, RestClientException {
String url = "/subjects?limit=" + limit;
List<String> response = httpRequest(url, "GET", null, requestProperties,
ALL_TOPICS_RESPONSE_TYPE);
return response;
}

public List<String> getDeletedOnlySubjects(String subjectPrefix)
throws IOException, RestClientException {
return getAllSubjects(DEFAULT_REQUEST_PROPERTIES, subjectPrefix, false, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,17 @@ public class SchemaRegistryConfig extends RestConfig {
public static final String SCHEMA_SEARCH_MAX_LIMIT_CONFIG = "schema.search.max.limit";
public static final int SCHEMA_SEARCH_MAX_LIMIT_DEFAULT = 1000;

/**
* <code>subject.search.default.limit</code>
*/
public static final String SUBJECT_SEARCH_DEFAULT_LIMIT_CONFIG = "subject.search.default.limit";
public static final int SUBJECT_SEARCH_DEFAULT_LIMIT_DEFAULT = 20000;
/**
* <code>subject.search.max.limit</code>
*/
public static final String SUBJECT_SEARCH_MAX_LIMIT_CONFIG = "subject.search.max.limit";
public static final int SUBJECT_SEARCH_MAX_LIMIT_DEFAULT = 20000;

public static final String METADATA_ENCODER_SECRET_CONFIG = "metadata.encoder.secret";
public static final String METADATA_ENCODER_OLD_SECRET_CONFIG = "metadata.encoder.old.secret";

Expand Down Expand Up @@ -378,6 +389,10 @@ public class SchemaRegistryConfig extends RestConfig {
"The default limit for schema searches.";
protected static final String SCHEMA_SEARCH_MAX_LIMIT_DOC =
"The max limit for schema searches.";
protected static final String SUBJECT_SEARCH_DEFAULT_LIMIT_DOC =
"The default limit for subject searches.";
protected static final String SUBJECT_SEARCH_MAX_LIMIT_DOC =
"The max limit for subject searches.";
protected static final String METADATA_ENCODER_SECRET_DOC =
"The secret used to encrypt and decrypt encoder keysets. "
+ "Use a random string with high entropy.";
Expand Down Expand Up @@ -599,6 +614,14 @@ DEFAULT_KAFKASTORE_WRITE_MAX_RETRIES, atLeast(0),
SCHEMA_SEARCH_MAX_LIMIT_DEFAULT,
ConfigDef.Importance.LOW, SCHEMA_SEARCH_MAX_LIMIT_DOC
)
.define(SUBJECT_SEARCH_DEFAULT_LIMIT_CONFIG, ConfigDef.Type.INT,
SUBJECT_SEARCH_DEFAULT_LIMIT_DEFAULT,
ConfigDef.Importance.LOW, SUBJECT_SEARCH_DEFAULT_LIMIT_DOC
)
.define(SUBJECT_SEARCH_MAX_LIMIT_CONFIG, ConfigDef.Type.INT,
SUBJECT_SEARCH_MAX_LIMIT_DEFAULT,
ConfigDef.Importance.LOW, SUBJECT_SEARCH_MAX_LIMIT_DOC
)
.define(METADATA_ENCODER_SECRET_CONFIG, ConfigDef.Type.PASSWORD, null,
ConfigDef.Importance.HIGH, METADATA_ENCODER_SECRET_DOC
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,7 @@ public synchronized List<ParsedSchema> getSchemas(
if (!DEFAULT_TENANT.equals(schemaRegistry.tenant())) {
subjectPrefix = schemaRegistry.tenant() + TENANT_DELIMITER + subjectPrefix;
}
Iterator<ExtendedSchema> schemas = null;
List<ParsedSchema> result = new ArrayList<>();
List<ExtendedSchema> schemas;
String errorMessage = "Error while getting schemas for prefix " + subjectPrefix;
try {
schemas = schemaRegistry.getVersionsWithSubjectPrefix(
Expand All @@ -274,11 +273,9 @@ public synchronized List<ParsedSchema> getSchemas(
} catch (SchemaRegistryException e) {
throw Errors.schemaRegistryException(errorMessage, e);
}
while (schemas.hasNext()) {
Schema s = schemas.next();
result.add(parseSchema(s).get());
}
return result;
return schemas.stream()
djibodu marked this conversation as resolved.
Show resolved Hide resolved
.map(schema -> parseSchema(schema).get())
.collect(Collectors.toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@
import javax.ws.rs.DefaultValue;
import javax.ws.rs.QueryParam;
import javax.ws.rs.PathParam;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

@Path("/schemas")
@Produces({Versions.SCHEMA_REGISTRY_V1_JSON_WEIGHTED,
Expand Down Expand Up @@ -100,8 +99,7 @@ public List<ExtendedSchema> getSchemas(
@DefaultValue("0") @QueryParam("offset") int offset,
@Parameter(description = "Pagination size for results. Ignored if negative")
@DefaultValue("-1") @QueryParam("limit") int limit) {
Iterator<ExtendedSchema> schemas;
List<ExtendedSchema> filteredSchemas = new ArrayList<>();
List<ExtendedSchema> schemas;
String errorMessage = "Error while getting schemas for prefix " + subjectPrefix;
LookupFilter filter = lookupDeletedSchema ? LookupFilter.INCLUDE_DELETED : LookupFilter.DEFAULT;
try {
Expand All @@ -117,17 +115,11 @@ public List<ExtendedSchema> getSchemas(
} catch (SchemaRegistryException e) {
throw Errors.schemaRegistryException(errorMessage, e);
}
limit = schemaRegistry.normalizeLimit(limit);
int toIndex = offset + limit;
int index = 0;
while (schemas.hasNext() && index < toIndex) {
ExtendedSchema schema = schemas.next();
if (index >= offset) {
filteredSchemas.add(schema);
}
index++;
}
return filteredSchemas;
limit = schemaRegistry.normalizeSchemaLimit(limit);
return schemas.stream()
.skip(offset)
.limit(limit)
.collect(Collectors.toList());
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.swagger.v3.oas.annotations.tags.Tag;
import io.swagger.v3.oas.annotations.tags.Tags;
import java.util.HashMap;
import java.util.LinkedHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -61,6 +62,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Path("/subjects")
@Produces({Versions.SCHEMA_REGISTRY_V1_JSON_WEIGHTED,
Expand Down Expand Up @@ -229,6 +232,10 @@ public Set<String> list(
@DefaultValue(QualifiedSubject.CONTEXT_WILDCARD)
@Parameter(description = "Subject name prefix")
@QueryParam("subjectPrefix") String subjectPrefix,
@Parameter(description = "Pagination offset for results")
@DefaultValue("0") @QueryParam("offset") int offset,
@Parameter(description = "Pagination size for results. Ignored if negative")
@DefaultValue("-1") @QueryParam("limit") int limit,
@Parameter(description = "Whether to look up deleted subjects")
@QueryParam("deleted") boolean lookupDeletedSubjects,
@Parameter(description = "Whether to return deleted subjects only")
Expand All @@ -242,8 +249,15 @@ public Set<String> list(
filter = LookupFilter.INCLUDE_DELETED;
}
try {
return schemaRegistry.listSubjectsWithPrefix(
subjectPrefix != null ? subjectPrefix : QualifiedSubject.CONTEXT_WILDCARD, filter);
Set<String> subjects = schemaRegistry.listSubjectsWithPrefix(
subjectPrefix != null ? subjectPrefix : QualifiedSubject.CONTEXT_WILDCARD, filter);
Stream<String> stream = subjects.stream();

limit = schemaRegistry.normalizeSubjectLimit(limit);
return stream
.skip(offset)
.limit(limit)
.collect(Collectors.toCollection(LinkedHashSet::new)); // preserve order
} catch (SchemaRegistryStoreException e) {
throw Errors.storeException("Error while listing subjects", e);
} catch (SchemaRegistryException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,10 @@ public class KafkaSchemaRegistry implements SchemaRegistry, LeaderAwareSchemaReg
private final int initTimeout;
private final boolean initWaitForReader;
private final int kafkaStoreMaxRetries;
private final int searchDefaultLimit;
private final int searchMaxLimit;
private final int schemaSearchDefaultLimit;
private final int schemaSearchMaxLimit;
private final int subjectSearchDefaultLimit;
private final int subjectSearchMaxLimit;
private final boolean delayLeaderElection;
private final boolean allowModeChanges;
private final boolean enableStoreHealthCheck;
Expand Down Expand Up @@ -225,9 +227,13 @@ public KafkaSchemaRegistry(SchemaRegistryConfig config,
.expireAfterAccess(config.getInt(SchemaRegistryConfig.SCHEMA_CACHE_EXPIRY_SECS_CONFIG),
TimeUnit.SECONDS)
.build(s -> loadSchema(s.getSchema(), s.isNew(), s.isNormalize()));
this.searchDefaultLimit =
config.getInt(SchemaRegistryConfig.SCHEMA_SEARCH_DEFAULT_LIMIT_CONFIG);
this.searchMaxLimit = config.getInt(SchemaRegistryConfig.SCHEMA_SEARCH_MAX_LIMIT_CONFIG);
this.schemaSearchDefaultLimit =
config.getInt(SchemaRegistryConfig.SCHEMA_SEARCH_DEFAULT_LIMIT_CONFIG);
this.schemaSearchMaxLimit = config.getInt(SchemaRegistryConfig.SCHEMA_SEARCH_MAX_LIMIT_CONFIG);
this.subjectSearchDefaultLimit =
config.getInt(SchemaRegistryConfig.SUBJECT_SEARCH_DEFAULT_LIMIT_CONFIG);
this.subjectSearchMaxLimit =
config.getInt(SchemaRegistryConfig.SUBJECT_SEARCH_MAX_LIMIT_CONFIG);
this.lookupCache = lookupCache();
this.idGenerator = identityGenerator(config);
this.kafkaStore = kafkaStore(config);
Expand Down Expand Up @@ -599,14 +605,22 @@ public SchemaProvider schemaProvider(String schemaType) {
return providers.get(schemaType);
}

public int normalizeLimit(int suppliedLimit) {
int limit = searchDefaultLimit;
if (suppliedLimit > 0 && suppliedLimit <= searchMaxLimit) {
public int normalizeLimit(int suppliedLimit, int defaultLimit, int maxLimit) {
int limit = defaultLimit;
if (suppliedLimit > 0 && suppliedLimit <= maxLimit) {
limit = suppliedLimit;
}
return limit;
}

public int normalizeSchemaLimit(int suppliedLimit) {
return normalizeLimit(suppliedLimit, schemaSearchDefaultLimit, schemaSearchMaxLimit);
}

public int normalizeSubjectLimit(int suppliedLimit) {
return normalizeLimit(suppliedLimit, subjectSearchDefaultLimit, subjectSearchMaxLimit);
}

public Schema register(String subject, RegisterSchemaRequest request, boolean normalize)
throws SchemaRegistryException {
try {
Expand Down Expand Up @@ -1907,7 +1921,7 @@ public Iterator<SchemaKey> getAllVersions(String subject, LookupFilter filter)
}

@Override
public Iterator<ExtendedSchema> getVersionsWithSubjectPrefix(String prefix,
public List<ExtendedSchema> getVersionsWithSubjectPrefix(String prefix,
djibodu marked this conversation as resolved.
Show resolved Hide resolved
boolean includeAliases,
LookupFilter filter,
boolean returnLatestOnly,
Expand All @@ -1921,7 +1935,7 @@ public Iterator<ExtendedSchema> getVersionsWithSubjectPrefix(String prefix,
}
}

private Iterator<ExtendedSchema> allVersionsWithSubjectPrefix(String prefix,
private List<ExtendedSchema> allVersionsWithSubjectPrefix(String prefix,
djibodu marked this conversation as resolved.
Show resolved Hide resolved
LookupFilter filter,
boolean returnLatestOnly,
Predicate<Schema> postFilter)
Expand All @@ -1934,11 +1948,11 @@ private Iterator<ExtendedSchema> allVersionsWithSubjectPrefix(String prefix,
result.addAll(schemaList);
}
Collections.sort(result);
return result.iterator();
return result;
}
}

public Iterator<ExtendedSchema> allVersionsIncludingAliasesWithSubjectPrefix(String prefix,
public List<ExtendedSchema> allVersionsIncludingAliasesWithSubjectPrefix(String prefix,
djibodu marked this conversation as resolved.
Show resolved Hide resolved
LookupFilter filter,
boolean returnLatestOnly,
Predicate<Schema> postFilter)
Expand All @@ -1965,7 +1979,7 @@ public Iterator<ExtendedSchema> allVersionsIncludingAliasesWithSubjectPrefix(Str
}
}
Collections.sort(result);
return result.iterator();
return result;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ Set<String> listSubjectsForId(int id, String subject, boolean returnDeleted)
Iterator<SchemaKey> getAllVersions(String subject, LookupFilter filter)
throws SchemaRegistryException;

Iterator<ExtendedSchema> getVersionsWithSubjectPrefix(
List<ExtendedSchema> getVersionsWithSubjectPrefix(
djibodu marked this conversation as resolved.
Show resolved Hide resolved
String prefix, boolean includeAliases, LookupFilter filter,
boolean latestOnly, Predicate<Schema> postFilter)
throws SchemaRegistryException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ public void testBasic() throws Exception {
allSubjects,
restApp.restClient.getAllSubjects());

// test pagination with limit of 1
assertEquals("Getting all subjects with pagination limit=1 should return first registered subject",
ImmutableList.of(allSubjects.get(0)),
restApp.restClient.getAllSubjectsWithPagination(1));

List<Schema> latestSchemas = restApp.restClient.getSchemas(null, false, true);
assertEquals("Getting latest schemas should return two schemas",
2,
Expand Down