Skip to content

Commit

Permalink
Proof Of Concept download specific subject version
Browse files Browse the repository at this point in the history
  • Loading branch information
danielpetisme committed Dec 9, 2020
1 parent 968b050 commit f1b5095
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@Mojo(name = "download")
public class DownloadSchemaRegistryMojo extends SchemaRegistryMojo {
Expand All @@ -54,30 +52,39 @@ public class DownloadSchemaRegistryMojo extends SchemaRegistryMojo {
@Parameter(required = true)
File outputDirectory;

Map<String, ParsedSchema> downloadSchemas(Collection<String> subjects)
Map<String, ParsedSchema> downloadSchemas(Collection<SubjectVersion> subjectVersions)
throws MojoExecutionException {
Map<String, ParsedSchema> results = new LinkedHashMap<>();

for (String subject : subjects) {
for (SubjectVersion subjectVersion : subjectVersions) {
SchemaMetadata schemaMetadata;
try {
getLog().info(String.format("Downloading latest metadata for %s.", subject));
schemaMetadata = this.client().getLatestSchemaMetadata(subject);
if(subjectVersion.version == -1) {
getLog().info(String.format("Downloading latest metadata for %s.", subjectVersion));
schemaMetadata = this.client().getLatestSchemaMetadata(subjectVersion.subject);
} else {
getLog().info(String.format("Downloading latest metadata for %s.", subjectVersion));
schemaMetadata = this.client().getSchemaMetadata(subjectVersion.subject, subjectVersion.version);
}
Optional<ParsedSchema> schema =
this.client().parseSchema(
schemaMetadata.getSchemaType(),
schemaMetadata.getSchema(),
schemaMetadata.getReferences());
if (schema.isPresent()) {
results.put(subject, schema.get());
results.put(subjectVersion.subject, schema.get());
} else {
throw new MojoExecutionException(
String.format("Error while parsing schema for %s", subject)
String.format("Error while parsing schema for %s", subjectVersion.subject)
);
}
} catch (Exception ex) {
throw new MojoExecutionException(
String.format("Exception thrown while downloading metadata for %s.", subject),
String.format(
"Exception thrown while downloading metadata for %s with version %s.",
subjectVersion.subject,
(subjectVersion.version == -1) ? "latest" : subjectVersion.version
),
ex
);
}
Expand Down Expand Up @@ -112,21 +119,6 @@ public void execute() throws MojoExecutionException, MojoFailureException {
throw new MojoExecutionException("Exception thrown while creating outputDirectory", ex);
}

List<Pattern> patterns = new ArrayList<>();

for (String subject : subjectPatterns) {
try {
getLog().debug(String.format("Creating pattern for '%s'", subject));
Pattern pattern = Pattern.compile(subject);
patterns.add(pattern);
} catch (Exception ex) {
throw new IllegalStateException(
String.format("Exception thrown while creating pattern '%s'", subject),
ex
);
}
}

Collection<String> allSubjects;
try {
getLog().info("Getting all subjects on schema registry...");
Expand All @@ -135,20 +127,20 @@ public void execute() throws MojoExecutionException, MojoFailureException {
throw new MojoExecutionException("Exception thrown", ex);
}

getLog().info(String.format("Schema Registry has %s subject(s).", allSubjects.size()));
Set<String> subjectsToDownload = new LinkedHashSet<>();

Set<SubjectVersion> subjectsToDownload = new LinkedHashSet<>();
for (String subject : allSubjects) {
for (Pattern pattern : patterns) {
getLog()
.debug(String.format("Checking '%s' against pattern '%s'", subject, pattern.pattern()));
Matcher matcher = pattern.matcher(subject);

if (matcher.matches()) {
for (String subjectPattern : subjectPatterns) {
int version = -1;
if(subjectPattern.indexOf(":") != -1) {
String[] patternAndVersion = subjectPattern.split(":");
subjectPattern = patternAndVersion[0];
version = Integer.valueOf(patternAndVersion[1]);
}
getLog().debug(String.format("Checking '%s' against pattern '%s'", subject, subjectPattern));
if(subject.matches(subjectPattern)) {
getLog().debug(String.format("'%s' matches pattern '%s' so downloading.", subject,
pattern.pattern()));
subjectsToDownload.add(subject);
break;
subjectPattern));
subjectsToDownload.add(new SubjectVersion(subject, version));
}
}
}
Expand Down Expand Up @@ -192,4 +184,5 @@ private String getExtension(ParsedSchema parsedSchema) {
return ".txt";
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.confluent.kafka.schemaregistry.maven;

public class SubjectVersion {
public final String subject;
public final int version;

public SubjectVersion(String subject, int version) {
this.subject = subject;
this.version = version;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ public void createMojo() {
@Test
public void specificSubjects() throws IOException, RestClientException, MojoFailureException, MojoExecutionException {
this.mojo.outputDirectory = this.tempDirectory;
this.mojo.subjectPatterns.clear();

List<File> filesToDownload = new ArrayList<>();
List<File> filesNotToDownload = new ArrayList<>();
this.mojo.subjectPatterns.clear();

for (int i = 0; i < 100; i++) {
String keySubject = String.format("TestSubject%03d-key", i);
Expand All @@ -70,13 +70,53 @@ public void specificSubjects() throws IOException, RestClientException, MojoFail
filesNotToDownload.add(valueSchemaFile);
}
}

this.mojo.execute();
for (File file: filesToDownload) {

for (File file : filesToDownload) {
Assert.assertThat(file.exists(), is(true));
}
for (File file: filesNotToDownload) {
for (File file : filesNotToDownload) {
Assert.assertThat(file.exists(), is(false));
}
}

@Test
public void specificSubjectAndVersion() throws IOException, RestClientException, MojoFailureException, MojoExecutionException {
this.mojo.outputDirectory = this.tempDirectory;
this.mojo.subjectPatterns.clear();

String valueSubject = "TestSubject-value";
Schema valueSchema = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)));
this.mojo.client().register(valueSubject, new AvroSchema(valueSchema), 1, 101);

Schema valueSchemaUpdated = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.INT), Schema.create(Schema.Type.NULL)));
this.mojo.client().register(valueSubject, new AvroSchema(valueSchemaUpdated), 2, 102);

File valueSchemaFile = new File(this.tempDirectory, valueSubject + ".avsc");

String subjectPattern = "^TestSubject-(key|value)$:2";
this.mojo.subjectPatterns.add(subjectPattern);

this.mojo.execute();

Assert.assertThat(valueSchemaFile.exists(), is(true));
Assert.assertThat(new Schema.Parser().parse(valueSchemaFile), is(valueSchemaUpdated));
}

@Test(expected = MojoExecutionException.class)
public void specificSubjectAndVersionNotFound() throws IOException, RestClientException, MojoFailureException, MojoExecutionException {
this.mojo.outputDirectory = this.tempDirectory;
this.mojo.subjectPatterns.clear();

String valueSubject = "TestSubject-value";
Schema valueSchema = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)));
this.mojo.client().register(valueSubject, new AvroSchema(valueSchema), 1, 101);

String subjectPattern = "^TestSubject-(key|value)$:9999";
this.mojo.subjectPatterns.add(subjectPattern);

this.mojo.execute();
}

}

0 comments on commit f1b5095

Please sign in to comment.