Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DGS-11414: add pagination for get all subjects #3455

Merged
merged 5 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1287,6 +1287,20 @@ public List<String> getAllSubjects(Map<String, String> requestProperties,
return response;
}

public List<String> getAllSubjectsWithPagination(int offset, int limit)
throws IOException, RestClientException {
return getAllSubjectsWithPagination(DEFAULT_REQUEST_PROPERTIES, offset, limit);
}

public List<String> getAllSubjectsWithPagination(Map<String, String> requestProperties,
int offset, int limit)
throws IOException, RestClientException {
String url = "/subjects?limit=" + limit + "&offset=" + offset;
List<String> response = httpRequest(url, "GET", null, requestProperties,
ALL_TOPICS_RESPONSE_TYPE);
return response;
}

public List<String> getDeletedOnlySubjects(String subjectPrefix)
throws IOException, RestClientException {
return getAllSubjects(DEFAULT_REQUEST_PROPERTIES, subjectPrefix, false, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,17 @@ public class SchemaRegistryConfig extends RestConfig {
public static final String SCHEMA_SEARCH_MAX_LIMIT_CONFIG = "schema.search.max.limit";
public static final int SCHEMA_SEARCH_MAX_LIMIT_DEFAULT = 1000;

/**
* <code>subject.search.default.limit</code>
*/
public static final String SUBJECT_SEARCH_DEFAULT_LIMIT_CONFIG = "subject.search.default.limit";
public static final int SUBJECT_SEARCH_DEFAULT_LIMIT_DEFAULT = 20000;
/**
* <code>subject.search.max.limit</code>
*/
public static final String SUBJECT_SEARCH_MAX_LIMIT_CONFIG = "subject.search.max.limit";
public static final int SUBJECT_SEARCH_MAX_LIMIT_DEFAULT = 20000;

public static final String METADATA_ENCODER_SECRET_CONFIG = "metadata.encoder.secret";
public static final String METADATA_ENCODER_OLD_SECRET_CONFIG = "metadata.encoder.old.secret";

Expand Down Expand Up @@ -378,6 +389,10 @@ public class SchemaRegistryConfig extends RestConfig {
"The default limit for schema searches.";
protected static final String SCHEMA_SEARCH_MAX_LIMIT_DOC =
"The max limit for schema searches.";
protected static final String SUBJECT_SEARCH_DEFAULT_LIMIT_DOC =
"The default limit for subject searches.";
protected static final String SUBJECT_SEARCH_MAX_LIMIT_DOC =
"The max limit for subject searches.";
protected static final String METADATA_ENCODER_SECRET_DOC =
"The secret used to encrypt and decrypt encoder keysets. "
+ "Use a random string with high entropy.";
Expand Down Expand Up @@ -599,6 +614,14 @@ DEFAULT_KAFKASTORE_WRITE_MAX_RETRIES, atLeast(0),
SCHEMA_SEARCH_MAX_LIMIT_DEFAULT,
ConfigDef.Importance.LOW, SCHEMA_SEARCH_MAX_LIMIT_DOC
)
.define(SUBJECT_SEARCH_DEFAULT_LIMIT_CONFIG, ConfigDef.Type.INT,
SUBJECT_SEARCH_DEFAULT_LIMIT_DEFAULT,
ConfigDef.Importance.LOW, SUBJECT_SEARCH_DEFAULT_LIMIT_DOC
)
.define(SUBJECT_SEARCH_MAX_LIMIT_CONFIG, ConfigDef.Type.INT,
SUBJECT_SEARCH_MAX_LIMIT_DEFAULT,
ConfigDef.Importance.LOW, SUBJECT_SEARCH_MAX_LIMIT_DOC
)
.define(METADATA_ENCODER_SECRET_CONFIG, ConfigDef.Type.PASSWORD, null,
ConfigDef.Importance.HIGH, METADATA_ENCODER_SECRET_DOC
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.kafka.schemaregistry.rest.resources;

import com.google.common.collect.Streams;
import io.confluent.kafka.schemaregistry.client.rest.Versions;
import io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage;
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
Expand All @@ -35,6 +36,7 @@
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import io.swagger.v3.oas.annotations.tags.Tags;
import java.util.Iterator;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -46,10 +48,10 @@
import javax.ws.rs.DefaultValue;
import javax.ws.rs.QueryParam;
import javax.ws.rs.PathParam;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;


@Path("/schemas")
@Produces({Versions.SCHEMA_REGISTRY_V1_JSON_WEIGHTED,
Expand Down Expand Up @@ -101,7 +103,6 @@ public List<ExtendedSchema> getSchemas(
@Parameter(description = "Pagination size for results. Ignored if negative")
@DefaultValue("-1") @QueryParam("limit") int limit) {
Iterator<ExtendedSchema> schemas;
List<ExtendedSchema> filteredSchemas = new ArrayList<>();
String errorMessage = "Error while getting schemas for prefix " + subjectPrefix;
LookupFilter filter = lookupDeletedSchema ? LookupFilter.INCLUDE_DELETED : LookupFilter.DEFAULT;
try {
Expand All @@ -117,17 +118,11 @@ public List<ExtendedSchema> getSchemas(
} catch (SchemaRegistryException e) {
throw Errors.schemaRegistryException(errorMessage, e);
}
limit = schemaRegistry.normalizeLimit(limit);
int toIndex = offset + limit;
int index = 0;
while (schemas.hasNext() && index < toIndex) {
ExtendedSchema schema = schemas.next();
if (index >= offset) {
filteredSchemas.add(schema);
}
index++;
}
return filteredSchemas;
limit = schemaRegistry.normalizeSchemaLimit(limit);
return Streams.stream(schemas)
.skip(offset)
.limit(limit)
.collect(Collectors.toList());
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.swagger.v3.oas.annotations.tags.Tag;
import io.swagger.v3.oas.annotations.tags.Tags;
import java.util.HashMap;
import java.util.LinkedHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -61,6 +62,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Path("/subjects")
@Produces({Versions.SCHEMA_REGISTRY_V1_JSON_WEIGHTED,
Expand Down Expand Up @@ -229,6 +232,10 @@ public Set<String> list(
@DefaultValue(QualifiedSubject.CONTEXT_WILDCARD)
@Parameter(description = "Subject name prefix")
@QueryParam("subjectPrefix") String subjectPrefix,
@Parameter(description = "Pagination offset for results")
@DefaultValue("0") @QueryParam("offset") int offset,
@Parameter(description = "Pagination size for results. Ignored if negative")
@DefaultValue("-1") @QueryParam("limit") int limit,
@Parameter(description = "Whether to look up deleted subjects")
@QueryParam("deleted") boolean lookupDeletedSubjects,
@Parameter(description = "Whether to return deleted subjects only")
Expand All @@ -242,8 +249,15 @@ public Set<String> list(
filter = LookupFilter.INCLUDE_DELETED;
}
try {
return schemaRegistry.listSubjectsWithPrefix(
subjectPrefix != null ? subjectPrefix : QualifiedSubject.CONTEXT_WILDCARD, filter);
Set<String> subjects = schemaRegistry.listSubjectsWithPrefix(
subjectPrefix != null ? subjectPrefix : QualifiedSubject.CONTEXT_WILDCARD, filter);
Stream<String> stream = subjects.stream();

limit = schemaRegistry.normalizeSubjectLimit(limit);
return stream
.skip(offset)
.limit(limit)
.collect(Collectors.toCollection(LinkedHashSet::new)); // preserve order
} catch (SchemaRegistryStoreException e) {
throw Errors.storeException("Error while listing subjects", e);
} catch (SchemaRegistryException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,10 @@ public class KafkaSchemaRegistry implements SchemaRegistry, LeaderAwareSchemaReg
private final int initTimeout;
private final boolean initWaitForReader;
private final int kafkaStoreMaxRetries;
private final int searchDefaultLimit;
private final int searchMaxLimit;
private final int schemaSearchDefaultLimit;
private final int schemaSearchMaxLimit;
private final int subjectSearchDefaultLimit;
private final int subjectSearchMaxLimit;
private final boolean delayLeaderElection;
private final boolean allowModeChanges;
private final boolean enableStoreHealthCheck;
Expand Down Expand Up @@ -225,9 +227,13 @@ public KafkaSchemaRegistry(SchemaRegistryConfig config,
.expireAfterAccess(config.getInt(SchemaRegistryConfig.SCHEMA_CACHE_EXPIRY_SECS_CONFIG),
TimeUnit.SECONDS)
.build(s -> loadSchema(s.getSchema(), s.isNew(), s.isNormalize()));
this.searchDefaultLimit =
config.getInt(SchemaRegistryConfig.SCHEMA_SEARCH_DEFAULT_LIMIT_CONFIG);
this.searchMaxLimit = config.getInt(SchemaRegistryConfig.SCHEMA_SEARCH_MAX_LIMIT_CONFIG);
this.schemaSearchDefaultLimit =
config.getInt(SchemaRegistryConfig.SCHEMA_SEARCH_DEFAULT_LIMIT_CONFIG);
this.schemaSearchMaxLimit = config.getInt(SchemaRegistryConfig.SCHEMA_SEARCH_MAX_LIMIT_CONFIG);
this.subjectSearchDefaultLimit =
config.getInt(SchemaRegistryConfig.SUBJECT_SEARCH_DEFAULT_LIMIT_CONFIG);
this.subjectSearchMaxLimit =
config.getInt(SchemaRegistryConfig.SUBJECT_SEARCH_MAX_LIMIT_CONFIG);
this.lookupCache = lookupCache();
this.idGenerator = identityGenerator(config);
this.kafkaStore = kafkaStore(config);
Expand Down Expand Up @@ -599,14 +605,22 @@ public SchemaProvider schemaProvider(String schemaType) {
return providers.get(schemaType);
}

public int normalizeLimit(int suppliedLimit) {
int limit = searchDefaultLimit;
if (suppliedLimit > 0 && suppliedLimit <= searchMaxLimit) {
public int normalizeLimit(int suppliedLimit, int defaultLimit, int maxLimit) {
int limit = defaultLimit;
if (suppliedLimit > 0 && suppliedLimit <= maxLimit) {
limit = suppliedLimit;
}
return limit;
}

public int normalizeSchemaLimit(int suppliedLimit) {
return normalizeLimit(suppliedLimit, schemaSearchDefaultLimit, schemaSearchMaxLimit);
}

public int normalizeSubjectLimit(int suppliedLimit) {
return normalizeLimit(suppliedLimit, subjectSearchDefaultLimit, subjectSearchMaxLimit);
}

public Schema register(String subject, RegisterSchemaRequest request, boolean normalize)
throws SchemaRegistryException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ public void testBasic() throws Exception {
allSubjects,
restApp.restClient.getAllSubjects());

// test pagination with limit of 1
assertEquals("Getting all subjects with pagination offset=0, limit=1 should return first registered subject",
ImmutableList.of(allSubjects.get(0)),
restApp.restClient.getAllSubjectsWithPagination(0, 1));
assertEquals("Getting all subjects with pagination offset=1, limit=1 should return second registered subject",
ImmutableList.of(allSubjects.get(1)),
restApp.restClient.getAllSubjectsWithPagination(1, 1));

List<Schema> latestSchemas = restApp.restClient.getSchemas(null, false, true);
assertEquals("Getting latest schemas should return two schemas",
2,
Expand Down