Skip to content

Commit

Permalink
DGS-11414: add pagination for get all subjects (#3455)
Browse files Browse the repository at this point in the history
  • Loading branch information
djibodu authored Jan 13, 2025
1 parent a3d786d commit d8e02d5
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1287,6 +1287,20 @@ public List<String> getAllSubjects(Map<String, String> requestProperties,
return response;
}

public List<String> getAllSubjectsWithPagination(int offset, int limit)
throws IOException, RestClientException {
return getAllSubjectsWithPagination(DEFAULT_REQUEST_PROPERTIES, offset, limit);
}

public List<String> getAllSubjectsWithPagination(Map<String, String> requestProperties,
int offset, int limit)
throws IOException, RestClientException {
String url = "/subjects?limit=" + limit + "&offset=" + offset;
List<String> response = httpRequest(url, "GET", null, requestProperties,
ALL_TOPICS_RESPONSE_TYPE);
return response;
}

public List<String> getDeletedOnlySubjects(String subjectPrefix)
throws IOException, RestClientException {
return getAllSubjects(DEFAULT_REQUEST_PROPERTIES, subjectPrefix, false, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,17 @@ public class SchemaRegistryConfig extends RestConfig {
public static final String SCHEMA_SEARCH_MAX_LIMIT_CONFIG = "schema.search.max.limit";
public static final int SCHEMA_SEARCH_MAX_LIMIT_DEFAULT = 1000;

/**
* <code>subject.search.default.limit</code>
*/
public static final String SUBJECT_SEARCH_DEFAULT_LIMIT_CONFIG = "subject.search.default.limit";
public static final int SUBJECT_SEARCH_DEFAULT_LIMIT_DEFAULT = 20000;
/**
* <code>subject.search.max.limit</code>
*/
public static final String SUBJECT_SEARCH_MAX_LIMIT_CONFIG = "subject.search.max.limit";
public static final int SUBJECT_SEARCH_MAX_LIMIT_DEFAULT = 20000;

public static final String METADATA_ENCODER_SECRET_CONFIG = "metadata.encoder.secret";
public static final String METADATA_ENCODER_OLD_SECRET_CONFIG = "metadata.encoder.old.secret";

Expand Down Expand Up @@ -378,6 +389,10 @@ public class SchemaRegistryConfig extends RestConfig {
"The default limit for schema searches.";
protected static final String SCHEMA_SEARCH_MAX_LIMIT_DOC =
"The max limit for schema searches.";
protected static final String SUBJECT_SEARCH_DEFAULT_LIMIT_DOC =
"The default limit for subject searches.";
protected static final String SUBJECT_SEARCH_MAX_LIMIT_DOC =
"The max limit for subject searches.";
protected static final String METADATA_ENCODER_SECRET_DOC =
"The secret used to encrypt and decrypt encoder keysets. "
+ "Use a random string with high entropy.";
Expand Down Expand Up @@ -599,6 +614,14 @@ DEFAULT_KAFKASTORE_WRITE_MAX_RETRIES, atLeast(0),
SCHEMA_SEARCH_MAX_LIMIT_DEFAULT,
ConfigDef.Importance.LOW, SCHEMA_SEARCH_MAX_LIMIT_DOC
)
.define(SUBJECT_SEARCH_DEFAULT_LIMIT_CONFIG, ConfigDef.Type.INT,
SUBJECT_SEARCH_DEFAULT_LIMIT_DEFAULT,
ConfigDef.Importance.LOW, SUBJECT_SEARCH_DEFAULT_LIMIT_DOC
)
.define(SUBJECT_SEARCH_MAX_LIMIT_CONFIG, ConfigDef.Type.INT,
SUBJECT_SEARCH_MAX_LIMIT_DEFAULT,
ConfigDef.Importance.LOW, SUBJECT_SEARCH_MAX_LIMIT_DOC
)
.define(METADATA_ENCODER_SECRET_CONFIG, ConfigDef.Type.PASSWORD, null,
ConfigDef.Importance.HIGH, METADATA_ENCODER_SECRET_DOC
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.kafka.schemaregistry.rest.resources;

import com.google.common.collect.Streams;
import io.confluent.kafka.schemaregistry.client.rest.Versions;
import io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage;
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
Expand All @@ -35,6 +36,7 @@
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import io.swagger.v3.oas.annotations.tags.Tags;
import java.util.Iterator;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -46,10 +48,10 @@
import javax.ws.rs.DefaultValue;
import javax.ws.rs.QueryParam;
import javax.ws.rs.PathParam;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;


@Path("/schemas")
@Produces({Versions.SCHEMA_REGISTRY_V1_JSON_WEIGHTED,
Expand Down Expand Up @@ -101,7 +103,6 @@ public List<ExtendedSchema> getSchemas(
@Parameter(description = "Pagination size for results. Ignored if negative")
@DefaultValue("-1") @QueryParam("limit") int limit) {
Iterator<ExtendedSchema> schemas;
List<ExtendedSchema> filteredSchemas = new ArrayList<>();
String errorMessage = "Error while getting schemas for prefix " + subjectPrefix;
LookupFilter filter = lookupDeletedSchema ? LookupFilter.INCLUDE_DELETED : LookupFilter.DEFAULT;
try {
Expand All @@ -117,17 +118,11 @@ public List<ExtendedSchema> getSchemas(
} catch (SchemaRegistryException e) {
throw Errors.schemaRegistryException(errorMessage, e);
}
limit = schemaRegistry.normalizeLimit(limit);
int toIndex = offset + limit;
int index = 0;
while (schemas.hasNext() && index < toIndex) {
ExtendedSchema schema = schemas.next();
if (index >= offset) {
filteredSchemas.add(schema);
}
index++;
}
return filteredSchemas;
limit = schemaRegistry.normalizeSchemaLimit(limit);
return Streams.stream(schemas)
.skip(offset)
.limit(limit)
.collect(Collectors.toList());
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.swagger.v3.oas.annotations.tags.Tag;
import io.swagger.v3.oas.annotations.tags.Tags;
import java.util.HashMap;
import java.util.LinkedHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -61,6 +62,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Path("/subjects")
@Produces({Versions.SCHEMA_REGISTRY_V1_JSON_WEIGHTED,
Expand Down Expand Up @@ -229,6 +232,10 @@ public Set<String> list(
@DefaultValue(QualifiedSubject.CONTEXT_WILDCARD)
@Parameter(description = "Subject name prefix")
@QueryParam("subjectPrefix") String subjectPrefix,
@Parameter(description = "Pagination offset for results")
@DefaultValue("0") @QueryParam("offset") int offset,
@Parameter(description = "Pagination size for results. Ignored if negative")
@DefaultValue("-1") @QueryParam("limit") int limit,
@Parameter(description = "Whether to look up deleted subjects")
@QueryParam("deleted") boolean lookupDeletedSubjects,
@Parameter(description = "Whether to return deleted subjects only")
Expand All @@ -242,8 +249,15 @@ public Set<String> list(
filter = LookupFilter.INCLUDE_DELETED;
}
try {
return schemaRegistry.listSubjectsWithPrefix(
subjectPrefix != null ? subjectPrefix : QualifiedSubject.CONTEXT_WILDCARD, filter);
Set<String> subjects = schemaRegistry.listSubjectsWithPrefix(
subjectPrefix != null ? subjectPrefix : QualifiedSubject.CONTEXT_WILDCARD, filter);
Stream<String> stream = subjects.stream();

limit = schemaRegistry.normalizeSubjectLimit(limit);
return stream
.skip(offset)
.limit(limit)
.collect(Collectors.toCollection(LinkedHashSet::new)); // preserve order
} catch (SchemaRegistryStoreException e) {
throw Errors.storeException("Error while listing subjects", e);
} catch (SchemaRegistryException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,10 @@ public class KafkaSchemaRegistry implements SchemaRegistry, LeaderAwareSchemaReg
private final int initTimeout;
private final boolean initWaitForReader;
private final int kafkaStoreMaxRetries;
private final int searchDefaultLimit;
private final int searchMaxLimit;
private final int schemaSearchDefaultLimit;
private final int schemaSearchMaxLimit;
private final int subjectSearchDefaultLimit;
private final int subjectSearchMaxLimit;
private final boolean delayLeaderElection;
private final boolean allowModeChanges;
private final boolean enableStoreHealthCheck;
Expand Down Expand Up @@ -225,9 +227,13 @@ public KafkaSchemaRegistry(SchemaRegistryConfig config,
.expireAfterAccess(config.getInt(SchemaRegistryConfig.SCHEMA_CACHE_EXPIRY_SECS_CONFIG),
TimeUnit.SECONDS)
.build(s -> loadSchema(s.getSchema(), s.isNew(), s.isNormalize()));
this.searchDefaultLimit =
config.getInt(SchemaRegistryConfig.SCHEMA_SEARCH_DEFAULT_LIMIT_CONFIG);
this.searchMaxLimit = config.getInt(SchemaRegistryConfig.SCHEMA_SEARCH_MAX_LIMIT_CONFIG);
this.schemaSearchDefaultLimit =
config.getInt(SchemaRegistryConfig.SCHEMA_SEARCH_DEFAULT_LIMIT_CONFIG);
this.schemaSearchMaxLimit = config.getInt(SchemaRegistryConfig.SCHEMA_SEARCH_MAX_LIMIT_CONFIG);
this.subjectSearchDefaultLimit =
config.getInt(SchemaRegistryConfig.SUBJECT_SEARCH_DEFAULT_LIMIT_CONFIG);
this.subjectSearchMaxLimit =
config.getInt(SchemaRegistryConfig.SUBJECT_SEARCH_MAX_LIMIT_CONFIG);
this.lookupCache = lookupCache();
this.idGenerator = identityGenerator(config);
this.kafkaStore = kafkaStore(config);
Expand Down Expand Up @@ -599,14 +605,22 @@ public SchemaProvider schemaProvider(String schemaType) {
return providers.get(schemaType);
}

public int normalizeLimit(int suppliedLimit) {
int limit = searchDefaultLimit;
if (suppliedLimit > 0 && suppliedLimit <= searchMaxLimit) {
public int normalizeLimit(int suppliedLimit, int defaultLimit, int maxLimit) {
int limit = defaultLimit;
if (suppliedLimit > 0 && suppliedLimit <= maxLimit) {
limit = suppliedLimit;
}
return limit;
}

public int normalizeSchemaLimit(int suppliedLimit) {
return normalizeLimit(suppliedLimit, schemaSearchDefaultLimit, schemaSearchMaxLimit);
}

public int normalizeSubjectLimit(int suppliedLimit) {
return normalizeLimit(suppliedLimit, subjectSearchDefaultLimit, subjectSearchMaxLimit);
}

public Schema register(String subject, RegisterSchemaRequest request, boolean normalize)
throws SchemaRegistryException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,14 @@ public void testBasic() throws Exception {
allSubjects,
restApp.restClient.getAllSubjects());

// test pagination with limit of 1
assertEquals("Getting all subjects with pagination offset=0, limit=1 should return first registered subject",
ImmutableList.of(allSubjects.get(0)),
restApp.restClient.getAllSubjectsWithPagination(0, 1));
assertEquals("Getting all subjects with pagination offset=1, limit=1 should return second registered subject",
ImmutableList.of(allSubjects.get(1)),
restApp.restClient.getAllSubjectsWithPagination(1, 1));

List<Schema> latestSchemas = restApp.restClient.getSchemas(null, false, true);
assertEquals("Getting latest schemas should return two schemas",
2,
Expand Down

0 comments on commit d8e02d5

Please sign in to comment.