Skip to content

Commit

Permalink
schema resource changes
Browse files Browse the repository at this point in the history
  • Loading branch information
djibodu committed Jan 6, 2025
1 parent 8be282f commit 692e7a0
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,7 @@ public synchronized List<ParsedSchema> getSchemas(
if (!DEFAULT_TENANT.equals(schemaRegistry.tenant())) {
subjectPrefix = schemaRegistry.tenant() + TENANT_DELIMITER + subjectPrefix;
}
Iterator<ExtendedSchema> schemas = null;
List<ParsedSchema> result = new ArrayList<>();
List<ExtendedSchema> schemas;
String errorMessage = "Error while getting schemas for prefix " + subjectPrefix;
try {
schemas = schemaRegistry.getVersionsWithSubjectPrefix(
Expand All @@ -274,11 +273,9 @@ public synchronized List<ParsedSchema> getSchemas(
} catch (SchemaRegistryException e) {
throw Errors.schemaRegistryException(errorMessage, e);
}
while (schemas.hasNext()) {
Schema s = schemas.next();
result.add(parseSchema(s).get());
}
return result;
return schemas.stream()
.map(schema -> parseSchema(schema).get())
.collect(Collectors.toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@
import javax.ws.rs.DefaultValue;
import javax.ws.rs.QueryParam;
import javax.ws.rs.PathParam;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

@Path("/schemas")
@Produces({Versions.SCHEMA_REGISTRY_V1_JSON_WEIGHTED,
Expand Down Expand Up @@ -100,8 +99,7 @@ public List<ExtendedSchema> getSchemas(
@DefaultValue("0") @QueryParam("offset") int offset,
@Parameter(description = "Pagination size for results. Ignored if negative")
@DefaultValue("-1") @QueryParam("limit") int limit) {
Iterator<ExtendedSchema> schemas;
List<ExtendedSchema> filteredSchemas = new ArrayList<>();
List<ExtendedSchema> schemas;
String errorMessage = "Error while getting schemas for prefix " + subjectPrefix;
LookupFilter filter = lookupDeletedSchema ? LookupFilter.INCLUDE_DELETED : LookupFilter.DEFAULT;
try {
Expand All @@ -118,16 +116,10 @@ public List<ExtendedSchema> getSchemas(
throw Errors.schemaRegistryException(errorMessage, e);
}
limit = schemaRegistry.normalizeSchemaLimit(limit);
int toIndex = offset + limit;
int index = 0;
while (schemas.hasNext() && index < toIndex) {
ExtendedSchema schema = schemas.next();
if (index >= offset) {
filteredSchemas.add(schema);
}
index++;
}
return filteredSchemas;
return schemas.stream()
.skip(offset)
.limit(limit)
.collect(Collectors.toList());
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,10 @@ public Set<String> list(
Stream<String> stream = subjects.stream();

limit = schemaRegistry.normalizeSubjectLimit(limit);

stream = stream.skip(offset).limit(limit);
return stream.collect(Collectors.toCollection(LinkedHashSet::new)); // preserve order
return stream
.skip(offset)
.limit(limit)
.collect(Collectors.toCollection(LinkedHashSet::new)); // preserve order
} catch (SchemaRegistryStoreException e) {
throw Errors.storeException("Error while listing subjects", e);
} catch (SchemaRegistryException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,20 +605,20 @@ public SchemaProvider schemaProvider(String schemaType) {
return providers.get(schemaType);
}

public int normalizeSchemaLimit(int suppliedLimit) {
int limit = schemaSearchDefaultLimit;
if (suppliedLimit > 0 && suppliedLimit <= schemaSearchMaxLimit) {
public int normalizeLimit(int suppliedLimit, int defaultLimit, int maxLimit) {
int limit = defaultLimit;
if (suppliedLimit > 0 && suppliedLimit <= maxLimit) {
limit = suppliedLimit;
}
return limit;
}

public int normalizeSchemaLimit(int suppliedLimit) {
return normalizeLimit(suppliedLimit, schemaSearchDefaultLimit, schemaSearchMaxLimit);
}

public int normalizeSubjectLimit(int suppliedLimit) {
int limit = subjectSearchDefaultLimit;
if (suppliedLimit > 0 && suppliedLimit <= subjectSearchMaxLimit) {
limit = suppliedLimit;
}
return limit;
return normalizeLimit(suppliedLimit, subjectSearchDefaultLimit, subjectSearchMaxLimit);
}

public Schema register(String subject, RegisterSchemaRequest request, boolean normalize)
Expand Down Expand Up @@ -1921,7 +1921,7 @@ public Iterator<SchemaKey> getAllVersions(String subject, LookupFilter filter)
}

@Override
public Iterator<ExtendedSchema> getVersionsWithSubjectPrefix(String prefix,
public List<ExtendedSchema> getVersionsWithSubjectPrefix(String prefix,
boolean includeAliases,
LookupFilter filter,
boolean returnLatestOnly,
Expand All @@ -1935,7 +1935,7 @@ public Iterator<ExtendedSchema> getVersionsWithSubjectPrefix(String prefix,
}
}

private Iterator<ExtendedSchema> allVersionsWithSubjectPrefix(String prefix,
private List<ExtendedSchema> allVersionsWithSubjectPrefix(String prefix,
LookupFilter filter,
boolean returnLatestOnly,
Predicate<Schema> postFilter)
Expand All @@ -1948,11 +1948,11 @@ private Iterator<ExtendedSchema> allVersionsWithSubjectPrefix(String prefix,
result.addAll(schemaList);
}
Collections.sort(result);
return result.iterator();
return result;
}
}

public Iterator<ExtendedSchema> allVersionsIncludingAliasesWithSubjectPrefix(String prefix,
public List<ExtendedSchema> allVersionsIncludingAliasesWithSubjectPrefix(String prefix,
LookupFilter filter,
boolean returnLatestOnly,
Predicate<Schema> postFilter)
Expand All @@ -1979,7 +1979,7 @@ public Iterator<ExtendedSchema> allVersionsIncludingAliasesWithSubjectPrefix(Str
}
}
Collections.sort(result);
return result.iterator();
return result;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ Set<String> listSubjectsForId(int id, String subject, boolean returnDeleted)
Iterator<SchemaKey> getAllVersions(String subject, LookupFilter filter)
throws SchemaRegistryException;

Iterator<ExtendedSchema> getVersionsWithSubjectPrefix(
List<ExtendedSchema> getVersionsWithSubjectPrefix(
String prefix, boolean includeAliases, LookupFilter filter,
boolean latestOnly, Predicate<Schema> postFilter)
throws SchemaRegistryException;
Expand Down

0 comments on commit 692e7a0

Please sign in to comment.