diff --git a/avro/avro-annotation-processor/build.gradle b/avro/avro-annotation-processor/build.gradle new file mode 100644 index 000000000..a4c9f17f2 --- /dev/null +++ b/avro/avro-annotation-processor/build.gradle @@ -0,0 +1,39 @@ +import org.apache.avro.tool.SpecificCompilerTool + +buildscript { + dependencies { + classpath "org.apache.avro:avro-tools:1.12.0" + } +} + +apply from: "${project.rootDir}/gradle/in-test-generated.gradle" + +dependencies { + api project(":annotation-processor-common") + api project(":kora-app-annotation-processor") + + testImplementation project(":avro:avro-common") + testImplementation testFixtures(project(":annotation-processor-common")) +} + +tasks.register("generateAvroClasses") { + group("build") + + var inputDir = "$projectDir/src/test/resources/avro" + var outputDir = "$buildDir/generated/sources/avro" + inputs.dir(inputDir) + outputs.dir(outputDir) + logging.captureStandardOutput(LogLevel.INFO); + logging.captureStandardError(LogLevel.ERROR) + + doFirst { + delete outputDir + } + + doLast { + var params = ["-bigDecimal", "schema", inputDir.toString(), outputDir.toString()] + new SpecificCompilerTool().run(System.in, System.out, System.err, params) + } +} + +test.dependsOn(tasks.generateAvroClasses) diff --git a/avro/avro-annotation-processor/src/main/java/ru/tinkoff/kora/avro/annotation/processor/AvroAnnotationProcessor.java b/avro/avro-annotation-processor/src/main/java/ru/tinkoff/kora/avro/annotation/processor/AvroAnnotationProcessor.java new file mode 100644 index 000000000..73862cd7e --- /dev/null +++ b/avro/avro-annotation-processor/src/main/java/ru/tinkoff/kora/avro/annotation/processor/AvroAnnotationProcessor.java @@ -0,0 +1,96 @@ +package ru.tinkoff.kora.avro.annotation.processor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; +import ru.tinkoff.kora.annotation.processor.common.AbstractKoraProcessor; +import ru.tinkoff.kora.annotation.processor.common.LogUtils; +import ru.tinkoff.kora.annotation.processor.common.ProcessingErrorException; +import ru.tinkoff.kora.avro.annotation.processor.reader.AvroReaderGenerator; +import ru.tinkoff.kora.avro.annotation.processor.writer.AvroWriterGenerator; + +import javax.annotation.processing.ProcessingEnvironment; +import javax.annotation.processing.RoundEnvironment; +import javax.lang.model.element.ElementKind; +import javax.lang.model.element.TypeElement; +import java.util.Set; + +public class AvroAnnotationProcessor extends AbstractKoraProcessor { + + private static final Logger logger = LoggerFactory.getLogger(AvroAnnotationProcessor.class); + + private boolean initialized = false; + private TypeElement avroBinaryAnnotation; + private TypeElement avroJsonAnnotation; + private AvroWriterGenerator writerGenerator; + private AvroReaderGenerator readerGenerator; + + @Override + public Set getSupportedAnnotationTypes() { + return Set.of(AvroTypes.avroBinary.canonicalName(), AvroTypes.avroJson.canonicalName()); + } + + @Override + public synchronized void init(ProcessingEnvironment processingEnv) { + super.init(processingEnv); + this.avroBinaryAnnotation = processingEnv.getElementUtils().getTypeElement(AvroTypes.avroBinary.canonicalName()); + if (this.avroBinaryAnnotation == null) { + return; + } + + this.avroJsonAnnotation = processingEnv.getElementUtils().getTypeElement(AvroTypes.avroJson.canonicalName()); + this.initialized = true; + this.writerGenerator = new AvroWriterGenerator(processingEnv); + this.readerGenerator = new AvroReaderGenerator(processingEnv); + } + + @Override + public boolean process(Set annotations, RoundEnvironment roundEnv) { + if (!this.initialized) { + return false; + } + if (roundEnv.processingOver()) { + return false; + } + + var avroBinaryElements = roundEnv.getElementsAnnotatedWith(this.avroBinaryAnnotation).stream() + .filter(e -> e.getKind().isClass() || e.getKind() == ElementKind.INTERFACE) + .toList(); + if (!avroBinaryElements.isEmpty()) { + LogUtils.logElementsFull(logger, Level.DEBUG, "Generating Avro Binary Readers & Writers for", avroBinaryElements); + for (var e : avroBinaryElements) { + try { + this.readerGenerator.generateBinary((TypeElement) e); + } catch (ProcessingErrorException ex) { + ex.printError(this.processingEnv); + } + try { + this.writerGenerator.generateBinary((TypeElement) e); + } catch (ProcessingErrorException ex) { + ex.printError(this.processingEnv); + } + } + } + + var avroJsonElements = roundEnv.getElementsAnnotatedWith(this.avroJsonAnnotation).stream() + .filter(e -> e.getKind().isClass() || e.getKind() == ElementKind.INTERFACE) + .toList(); + if (!avroJsonElements.isEmpty()) { + LogUtils.logElementsFull(logger, Level.DEBUG, "Generating Avro Json Readers & Writers for", avroBinaryElements); + for (var e : avroJsonElements) { + try { + this.readerGenerator.generateJson((TypeElement) e); + } catch (ProcessingErrorException ex) { + ex.printError(this.processingEnv); + } + try { + this.writerGenerator.generateJson((TypeElement) e); + } catch (ProcessingErrorException ex) { + ex.printError(this.processingEnv); + } + } + } + + return false; + } +} diff --git a/avro/avro-annotation-processor/src/main/java/ru/tinkoff/kora/avro/annotation/processor/AvroTypes.java b/avro/avro-annotation-processor/src/main/java/ru/tinkoff/kora/avro/annotation/processor/AvroTypes.java new file mode 100644 index 000000000..af1e8fa43 --- /dev/null +++ b/avro/avro-annotation-processor/src/main/java/ru/tinkoff/kora/avro/annotation/processor/AvroTypes.java @@ -0,0 +1,21 @@ +package ru.tinkoff.kora.avro.annotation.processor; + +import com.squareup.javapoet.ClassName; + +public final class AvroTypes { + + private AvroTypes() {} + + public static final ClassName avroBinary = ClassName.get("ru.tinkoff.kora.avro.common.annotation", "AvroBinary"); + public static final ClassName avroJson = ClassName.get("ru.tinkoff.kora.avro.common.annotation", "AvroJson"); + + public static final ClassName reader = ClassName.get("ru.tinkoff.kora.avro.common", "AvroReader"); + public static final ClassName writer = ClassName.get("ru.tinkoff.kora.avro.common", "AvroWriter"); + + public static final ClassName schema = ClassName.get("org.apache.avro", "Schema"); + public static final ClassName specificData = ClassName.get("org.apache.avro.specific", "SpecificData"); + public static final ClassName datumReader = ClassName.get("org.apache.avro.specific", "SpecificDatumReader"); + public static final ClassName datumWriter = ClassName.get("org.apache.avro.specific", "SpecificDatumWriter"); + public static final ClassName decoderFactory = ClassName.get("org.apache.avro.io", "DecoderFactory"); + public static final ClassName encoderFactory = ClassName.get("org.apache.avro.io", "EncoderFactory"); +} diff --git a/avro/avro-annotation-processor/src/main/java/ru/tinkoff/kora/avro/annotation/processor/AvroUtils.java b/avro/avro-annotation-processor/src/main/java/ru/tinkoff/kora/avro/annotation/processor/AvroUtils.java new file mode 100644 index 000000000..69e5111d5 --- /dev/null +++ b/avro/avro-annotation-processor/src/main/java/ru/tinkoff/kora/avro/annotation/processor/AvroUtils.java @@ -0,0 +1,55 @@ +package ru.tinkoff.kora.avro.annotation.processor; + +import ru.tinkoff.kora.annotation.processor.common.NameUtils; + +import javax.lang.model.element.Element; +import javax.lang.model.element.TypeElement; +import javax.lang.model.type.TypeMirror; +import javax.lang.model.util.Elements; +import javax.lang.model.util.Types; + + +public final class AvroUtils { + + private AvroUtils() {} + + public static String classPackage(Elements elements, Element typeElement) { + return elements.getPackageOf(typeElement).getQualifiedName().toString(); + } + + public static String writerBinaryName(Element typeElement) { + return NameUtils.generatedType(typeElement, "AvroBinaryWriter"); + } + + public static String writerBinaryName(Types types, TypeMirror typeMirror) { + var typeElement = types.asElement(typeMirror); + return writerBinaryName(typeElement); + } + + public static String writerJsonName(Element typeElement) { + return NameUtils.generatedType(typeElement, "AvroJsonWriter"); + } + + public static String writerJsonName(Types types, TypeMirror typeMirror) { + var typeElement = types.asElement(typeMirror); + return writerJsonName(typeElement); + } + + public static String readerBinaryName(TypeElement typeElement) { + return NameUtils.generatedType(typeElement, "AvroBinaryReader"); + } + + public static String readerBinaryName(Types types, TypeMirror typeMirror) { + var typeElement = types.asElement(typeMirror); + return readerBinaryName((TypeElement) typeElement); + } + + public static String readerJsonName(TypeElement typeElement) { + return NameUtils.generatedType(typeElement, "AvroJsonReader"); + } + + public static String readerJsonName(Types types, TypeMirror typeMirror) { + var typeElement = types.asElement(typeMirror); + return readerJsonName((TypeElement) typeElement); + } +} diff --git a/avro/avro-annotation-processor/src/main/java/ru/tinkoff/kora/avro/annotation/processor/extension/AvroExtension.java b/avro/avro-annotation-processor/src/main/java/ru/tinkoff/kora/avro/annotation/processor/extension/AvroExtension.java new file mode 100644 index 000000000..3878b8390 --- /dev/null +++ b/avro/avro-annotation-processor/src/main/java/ru/tinkoff/kora/avro/annotation/processor/extension/AvroExtension.java @@ -0,0 +1,187 @@ +package ru.tinkoff.kora.avro.annotation.processor.extension; + +import com.squareup.javapoet.ClassName; +import jakarta.annotation.Nullable; +import ru.tinkoff.kora.annotation.processor.common.AnnotationUtils; +import ru.tinkoff.kora.annotation.processor.common.ProcessingErrorException; +import ru.tinkoff.kora.avro.annotation.processor.AvroTypes; +import ru.tinkoff.kora.avro.annotation.processor.AvroUtils; +import ru.tinkoff.kora.avro.annotation.processor.reader.AvroReaderGenerator; +import ru.tinkoff.kora.avro.annotation.processor.writer.AvroWriterGenerator; +import ru.tinkoff.kora.kora.app.annotation.processor.extension.ExtensionResult; +import ru.tinkoff.kora.kora.app.annotation.processor.extension.KoraExtension; + +import javax.annotation.processing.ProcessingEnvironment; +import javax.annotation.processing.RoundEnvironment; +import javax.lang.model.element.ElementKind; +import javax.lang.model.element.ExecutableElement; +import javax.lang.model.element.Modifier; +import javax.lang.model.element.TypeElement; +import javax.lang.model.type.DeclaredType; +import javax.lang.model.type.TypeKind; +import javax.lang.model.type.TypeMirror; +import javax.lang.model.util.Elements; +import javax.lang.model.util.Types; +import java.util.Set; + +public class AvroExtension implements KoraExtension { + + private final Types types; + private final Elements elements; + private final TypeMirror writerErasure; + private final TypeMirror readerErasure; + private final AvroReaderGenerator readerGenerator; + private final AvroWriterGenerator writerGenerator; + + public AvroExtension(ProcessingEnvironment processingEnv) { + this.types = processingEnv.getTypeUtils(); + this.elements = processingEnv.getElementUtils(); + + this.writerErasure = this.types.erasure(this.elements.getTypeElement(AvroTypes.writer.canonicalName()).asType()); + this.readerErasure = this.types.erasure(this.elements.getTypeElement(AvroTypes.reader.canonicalName()).asType()); + this.readerGenerator = new AvroReaderGenerator(processingEnv); + this.writerGenerator = new AvroWriterGenerator(processingEnv); + } + + @Nullable + @Override + public KoraExtensionDependencyGenerator getDependencyGenerator(RoundEnvironment roundEnvironment, TypeMirror typeMirror, Set tags) { + boolean isBinary = tags.isEmpty() || isBinary(tags); + boolean isJson = isJson(tags); + if (!isBinary && !isJson) { + return null; + } + + var erasure = this.types.erasure(typeMirror); + if (this.types.isSameType(erasure, this.writerErasure)) { + var writerType = (DeclaredType) typeMirror; + var targetType = writerType.getTypeArguments().get(0); + if (targetType.getKind() != TypeKind.DECLARED) { + return null; + } + + var targetElement = (TypeElement) this.types.asElement(targetType); + if (isBinary && AnnotationUtils.findAnnotation(targetElement, AvroTypes.avroBinary) != null) { + return KoraExtensionDependencyGenerator.generatedFrom(elements, targetElement, AvroTypes.writer); + } + if (isJson && AnnotationUtils.findAnnotation(targetElement, AvroTypes.avroJson) != null) { + return KoraExtensionDependencyGenerator.generatedFrom(elements, targetElement, AvroTypes.writer); + } + + if (targetElement.getKind() == ElementKind.ENUM + || targetElement.getKind().isInterface() + || targetElement.getModifiers().contains(Modifier.ABSTRACT)) { + return null; + } + + try { + return () -> this.generateWriter(targetType, tags, isBinary); + } catch (ProcessingErrorException e) { + return null; + } + } + + if (this.types.isSameType(erasure, this.readerErasure)) { + var readerType = (DeclaredType) typeMirror; + var targetType = readerType.getTypeArguments().get(0); + if (targetType.getKind() != TypeKind.DECLARED) { + return null; + } + + var targetElement = (TypeElement) types.asElement(targetType); + if (isBinary && AnnotationUtils.findAnnotation(targetElement, AvroTypes.avroBinary) != null) { + return KoraExtensionDependencyGenerator.generatedFrom(elements, targetElement, AvroTypes.reader); + } + if (isJson && AnnotationUtils.findAnnotation(targetElement, AvroTypes.avroJson) != null) { + return KoraExtensionDependencyGenerator.generatedFrom(elements, targetElement, AvroTypes.reader); + } + + if (targetElement.getKind() == ElementKind.ENUM + || targetElement.getKind().isInterface() + || targetElement.getModifiers().contains(Modifier.ABSTRACT)) { + return null; + } + + try { + return () -> this.generateReader(targetType, tags, isBinary); + } catch (ProcessingErrorException e) { + return null; + } + } + return null; + } + + private boolean isBinary(Set tags) { + return tags.equals(Set.of(AvroTypes.avroBinary.canonicalName())); + } + + private boolean isJson(Set tags) { + return tags.equals(Set.of(AvroTypes.avroJson.canonicalName())); + } + + @Nullable + private ExtensionResult generateReader(TypeMirror typeMirror, Set tags, boolean isBinary) { + var element = (TypeElement) this.types.asElement(typeMirror); + var packageElement = this.elements.getPackageOf(element).getQualifiedName().toString(); + var resultClassName = isBinary + ? AvroUtils.readerBinaryName(this.types, typeMirror) + : AvroUtils.readerJsonName(this.types, typeMirror); + var resultElement = this.elements.getTypeElement(packageElement + "." + resultClassName); + if (resultElement != null) { + return buildExtensionResult(resultElement, tags); + } + + ClassName annotation = isBinary ? AvroTypes.avroBinary : AvroTypes.avroJson; + if (AnnotationUtils.findAnnotation(element, annotation) != null) { + // annotation processor will handle that + return ExtensionResult.nextRound(); + } + + if (isBinary) { + this.readerGenerator.generateBinary(element); + } else { + this.readerGenerator.generateJson(element); + } + return ExtensionResult.nextRound(); + } + + @Nullable + private ExtensionResult generateWriter(TypeMirror typeMirror, Set tags, boolean isBinary) { + var element = (TypeElement) this.types.asElement(typeMirror); + var packageElement = this.elements.getPackageOf(element).getQualifiedName().toString(); + var resultClassName = isBinary + ? AvroUtils.writerBinaryName(this.types, typeMirror) + : AvroUtils.writerJsonName(this.types, typeMirror); + var resultElement = this.elements.getTypeElement(packageElement + "." + resultClassName); + if (resultElement != null) { + return buildExtensionResult(resultElement, tags); + } + + ClassName annotation = isBinary ? AvroTypes.avroBinary : AvroTypes.avroJson; + if (AnnotationUtils.findAnnotation(element, annotation) != null) { + // annotation processor will handle that + return ExtensionResult.nextRound(); + } + + if (isBinary) { + this.writerGenerator.generateBinary(element); + } else { + this.writerGenerator.generateJson(element); + } + return ExtensionResult.nextRound(); + } + + private ExtensionResult buildExtensionResult(TypeElement resultElement, Set tags) { + var constructor = findDefaultConstructor(resultElement); + return ExtensionResult.fromExecutable(constructor, tags); + } + + private ExecutableElement findDefaultConstructor(TypeElement resultElement) { + return resultElement.getEnclosedElements() + .stream() + .filter(e -> e.getKind() == ElementKind.CONSTRUCTOR) + .map(ExecutableElement.class::cast) + .findFirst() + .orElseThrow(() -> new ProcessingErrorException("No primary constructor found for " + resultElement, resultElement)); + } +} diff --git a/avro/avro-annotation-processor/src/main/java/ru/tinkoff/kora/avro/annotation/processor/extension/AvroExtensionFactory.java b/avro/avro-annotation-processor/src/main/java/ru/tinkoff/kora/avro/annotation/processor/extension/AvroExtensionFactory.java new file mode 100644 index 000000000..bd203703c --- /dev/null +++ b/avro/avro-annotation-processor/src/main/java/ru/tinkoff/kora/avro/annotation/processor/extension/AvroExtensionFactory.java @@ -0,0 +1,21 @@ +package ru.tinkoff.kora.avro.annotation.processor.extension; + +import ru.tinkoff.kora.avro.annotation.processor.AvroTypes; +import ru.tinkoff.kora.kora.app.annotation.processor.extension.ExtensionFactory; +import ru.tinkoff.kora.kora.app.annotation.processor.extension.KoraExtension; + +import javax.annotation.processing.ProcessingEnvironment; +import java.util.Optional; + +public class AvroExtensionFactory implements ExtensionFactory { + + @Override + public Optional create(ProcessingEnvironment processingEnvironment) { + var avro = processingEnvironment.getElementUtils().getTypeElement(AvroTypes.avroBinary.canonicalName()); + if (avro == null) { + return Optional.empty(); + } else { + return Optional.of(new AvroExtension(processingEnvironment)); + } + } +} diff --git a/avro/avro-annotation-processor/src/main/java/ru/tinkoff/kora/avro/annotation/processor/reader/AvroReaderGenerator.java b/avro/avro-annotation-processor/src/main/java/ru/tinkoff/kora/avro/annotation/processor/reader/AvroReaderGenerator.java new file mode 100644 index 000000000..8fed3f6fa --- /dev/null +++ b/avro/avro-annotation-processor/src/main/java/ru/tinkoff/kora/avro/annotation/processor/reader/AvroReaderGenerator.java @@ -0,0 +1,120 @@ +package ru.tinkoff.kora.avro.annotation.processor.reader; + +import com.squareup.javapoet.*; +import jakarta.annotation.Nullable; +import ru.tinkoff.kora.annotation.processor.common.CommonClassNames; +import ru.tinkoff.kora.annotation.processor.common.CommonUtils; +import ru.tinkoff.kora.avro.annotation.processor.AvroTypes; +import ru.tinkoff.kora.avro.annotation.processor.AvroUtils; + +import javax.annotation.processing.ProcessingEnvironment; +import javax.lang.model.element.Modifier; +import javax.lang.model.element.TypeElement; +import javax.lang.model.element.TypeParameterElement; +import java.io.IOException; +import java.io.InputStream; + +public class AvroReaderGenerator { + + private final ProcessingEnvironment env; + + public AvroReaderGenerator(ProcessingEnvironment processingEnvironment) { + this.env = processingEnvironment; + } + + public void generateBinary(TypeElement element) { + var typeName = TypeName.get(element.asType()); + var typeBuilder = TypeSpec.classBuilder(AvroUtils.readerBinaryName(element)) + .addAnnotation(AnnotationSpec.builder(CommonClassNames.koraGenerated) + .addMember("value", CodeBlock.of("$S", AvroReaderGenerator.class.getCanonicalName())) + .build()) + .addSuperinterface(ParameterizedTypeName.get(AvroTypes.reader, typeName)) + .addModifiers(Modifier.PUBLIC, Modifier.FINAL) + .addOriginatingElement(element); + + for (TypeParameterElement typeParameter : element.getTypeParameters()) { + typeBuilder.addTypeVariable(TypeVariableName.get(typeParameter)); + } + + typeBuilder.addField(FieldSpec.builder(AvroTypes.schema, "SCHEMA") + .addModifiers(Modifier.PRIVATE, Modifier.STATIC, Modifier.FINAL) + .initializer("$T.getClassSchema()", typeName) + .build()); + typeBuilder.addField(FieldSpec.builder(AvroTypes.specificData, "SPECIFIC_DATA") + .addModifiers(Modifier.PRIVATE, Modifier.STATIC, Modifier.FINAL) + .initializer("new $T().getSpecificData()", typeName) + .build()); + typeBuilder.addField(FieldSpec.builder(ParameterizedTypeName.get(AvroTypes.datumReader, typeName), "READER") + .addModifiers(Modifier.PRIVATE, Modifier.STATIC, Modifier.FINAL) + .initializer("new $T<>(SCHEMA, SCHEMA, SPECIFIC_DATA)", AvroTypes.datumReader) + .build()); + + var method = MethodSpec.methodBuilder("read") + .addModifiers(Modifier.PUBLIC, Modifier.FINAL) + .addException(IOException.class) + .addAnnotation(Nullable.class) + .addAnnotation(Override.class) + .addParameter(TypeName.get(InputStream.class), "value") + .returns(typeName); + method.beginControlFlow("if (value == null || value.available() == 0)"); + method.addStatement("return null"); + method.endControlFlow(); + method.addStatement("var decoder = $T.get().directBinaryDecoder(value, null)", AvroTypes.decoderFactory); + method.addStatement("return READER.read(new $T(), decoder)", typeName); + + typeBuilder.addMethod(method.build()); + TypeSpec spec = typeBuilder.build(); + + var packageElement = AvroUtils.classPackage(this.env.getElementUtils(), element); + var javaFile = JavaFile.builder(packageElement, spec).build(); + CommonUtils.safeWriteTo(this.env, javaFile); + } + + public void generateJson(TypeElement element) { + var typeName = TypeName.get(element.asType()); + var typeBuilder = TypeSpec.classBuilder(AvroUtils.readerJsonName(element)) + .addAnnotation(AnnotationSpec.builder(CommonClassNames.koraGenerated) + .addMember("value", CodeBlock.of("$S", AvroReaderGenerator.class.getCanonicalName())) + .build()) + .addSuperinterface(ParameterizedTypeName.get(AvroTypes.reader, typeName)) + .addModifiers(Modifier.PUBLIC, Modifier.FINAL) + .addOriginatingElement(element); + + for (TypeParameterElement typeParameter : element.getTypeParameters()) { + typeBuilder.addTypeVariable(TypeVariableName.get(typeParameter)); + } + + typeBuilder.addField(FieldSpec.builder(AvroTypes.schema, "SCHEMA") + .addModifiers(Modifier.PRIVATE, Modifier.STATIC, Modifier.FINAL) + .initializer("$T.getClassSchema()", typeName) + .build()); + typeBuilder.addField(FieldSpec.builder(AvroTypes.specificData, "SPECIFIC_DATA") + .addModifiers(Modifier.PRIVATE, Modifier.STATIC, Modifier.FINAL) + .initializer("new $T().getSpecificData()", typeName) + .build()); + typeBuilder.addField(FieldSpec.builder(ParameterizedTypeName.get(AvroTypes.datumReader, typeName), "READER") + .addModifiers(Modifier.PRIVATE, Modifier.STATIC, Modifier.FINAL) + .initializer("new $T<>(SCHEMA, SCHEMA, SPECIFIC_DATA)", AvroTypes.datumReader) + .build()); + + var method = MethodSpec.methodBuilder("read") + .addModifiers(Modifier.PUBLIC, Modifier.FINAL) + .addException(IOException.class) + .addAnnotation(Nullable.class) + .addAnnotation(Override.class) + .addParameter(TypeName.get(InputStream.class), "value") + .returns(typeName); + method.beginControlFlow("if (value == null || value.available() == 0)"); + method.addStatement("return null"); + method.endControlFlow(); + method.addStatement("var decoder = $T.get().jsonDecoder(SCHEMA, value)", AvroTypes.decoderFactory); + method.addStatement("return READER.read(new $T(), decoder)", typeName); + + typeBuilder.addMethod(method.build()); + TypeSpec spec = typeBuilder.build(); + + var packageElement = AvroUtils.classPackage(this.env.getElementUtils(), element); + var javaFile = JavaFile.builder(packageElement, spec).build(); + CommonUtils.safeWriteTo(this.env, javaFile); + } +} diff --git a/avro/avro-annotation-processor/src/main/java/ru/tinkoff/kora/avro/annotation/processor/writer/AvroWriterGenerator.java b/avro/avro-annotation-processor/src/main/java/ru/tinkoff/kora/avro/annotation/processor/writer/AvroWriterGenerator.java new file mode 100644 index 000000000..ba7e937a5 --- /dev/null +++ b/avro/avro-annotation-processor/src/main/java/ru/tinkoff/kora/avro/annotation/processor/writer/AvroWriterGenerator.java @@ -0,0 +1,133 @@ +package ru.tinkoff.kora.avro.annotation.processor.writer; + +import com.squareup.javapoet.*; +import jakarta.annotation.Nullable; +import ru.tinkoff.kora.annotation.processor.common.CommonClassNames; +import ru.tinkoff.kora.annotation.processor.common.CommonUtils; +import ru.tinkoff.kora.avro.annotation.processor.AvroTypes; +import ru.tinkoff.kora.avro.annotation.processor.AvroUtils; + +import javax.annotation.processing.ProcessingEnvironment; +import javax.lang.model.element.Modifier; +import javax.lang.model.element.TypeElement; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +public class AvroWriterGenerator { + + private final ProcessingEnvironment env; + + public AvroWriterGenerator(ProcessingEnvironment processingEnvironment) { + this.env = processingEnvironment; + } + + public void generateBinary(TypeElement element) { + var typeName = TypeName.get(element.asType()); + var typeBuilder = TypeSpec.classBuilder(AvroUtils.writerBinaryName(element)) + .addAnnotation(AnnotationSpec.builder(CommonClassNames.koraGenerated) + .addMember("value", CodeBlock.of("$S", AvroWriterGenerator.class.getCanonicalName())) + .build()) + .addSuperinterface(ParameterizedTypeName.get(AvroTypes.writer, typeName)) + .addModifiers(Modifier.PUBLIC, Modifier.FINAL) + .addOriginatingElement(element); + + for (var typeParameter : element.getTypeParameters()) { + typeBuilder.addTypeVariable(TypeVariableName.get(typeParameter)); + } + + typeBuilder.addField(FieldSpec.builder(ArrayTypeName.of(TypeName.BYTE), "EMPTY") + .addModifiers(Modifier.PRIVATE, Modifier.STATIC, Modifier.FINAL) + .initializer("new byte[]{}") + .build()); + typeBuilder.addField(FieldSpec.builder(AvroTypes.schema, "SCHEMA") + .addModifiers(Modifier.PRIVATE, Modifier.STATIC, Modifier.FINAL) + .initializer("$T.getClassSchema()", typeName) + .build()); + typeBuilder.addField(FieldSpec.builder(AvroTypes.specificData, "SPECIFIC_DATA") + .addModifiers(Modifier.PRIVATE, Modifier.STATIC, Modifier.FINAL) + .initializer("new $T().getSpecificData()", typeName) + .build()); + typeBuilder.addField(FieldSpec.builder(ParameterizedTypeName.get(AvroTypes.datumWriter, typeName), "WRITER") + .addModifiers(Modifier.PRIVATE, Modifier.STATIC, Modifier.FINAL) + .initializer("new $T<>(SCHEMA, SPECIFIC_DATA)", AvroTypes.datumWriter) + .build()); + + var method = MethodSpec.methodBuilder("writeBytes") + .addModifiers(Modifier.PUBLIC, Modifier.FINAL) + .addException(IOException.class) + .addAnnotation(Override.class) + .addParameter(ParameterSpec.builder(typeName, "value") + .addAnnotation(Nullable.class).build()) + .returns(ArrayTypeName.of(TypeName.BYTE)); + method.beginControlFlow("if (value == null)"); + method.addStatement("return EMPTY"); + method.endControlFlow(); + method.beginControlFlow("try (var os = new $T())", ByteArrayOutputStream.class); + method.addStatement("var encoder = $T.get().directBinaryEncoder(os, null)", AvroTypes.encoderFactory); + method.addStatement("WRITER.write(value, encoder)"); + method.addStatement("encoder.flush()"); + method.addStatement("return os.toByteArray()"); + method.endControlFlow(); + + typeBuilder.addMethod(method.build()); + TypeSpec spec = typeBuilder.build(); + var packageElement = AvroUtils.classPackage(this.env.getElementUtils(), element); + var javaFile = JavaFile.builder(packageElement, spec).build(); + CommonUtils.safeWriteTo(this.env, javaFile); + } + + public void generateJson(TypeElement element) { + var typeName = TypeName.get(element.asType()); + var typeBuilder = TypeSpec.classBuilder(AvroUtils.writerJsonName(element)) + .addAnnotation(AnnotationSpec.builder(CommonClassNames.koraGenerated) + .addMember("value", CodeBlock.of("$S", AvroWriterGenerator.class.getCanonicalName())) + .build()) + .addSuperinterface(ParameterizedTypeName.get(AvroTypes.writer, typeName)) + .addModifiers(Modifier.PUBLIC, Modifier.FINAL) + .addOriginatingElement(element); + + for (var typeParameter : element.getTypeParameters()) { + typeBuilder.addTypeVariable(TypeVariableName.get(typeParameter)); + } + + typeBuilder.addField(FieldSpec.builder(ArrayTypeName.of(TypeName.BYTE), "EMPTY") + .addModifiers(Modifier.PRIVATE, Modifier.STATIC, Modifier.FINAL) + .initializer("new byte[]{}") + .build()); + typeBuilder.addField(FieldSpec.builder(AvroTypes.schema, "SCHEMA") + .addModifiers(Modifier.PRIVATE, Modifier.STATIC, Modifier.FINAL) + .initializer("$T.getClassSchema()", typeName) + .build()); + typeBuilder.addField(FieldSpec.builder(AvroTypes.specificData, "SPECIFIC_DATA") + .addModifiers(Modifier.PRIVATE, Modifier.STATIC, Modifier.FINAL) + .initializer("new $T().getSpecificData()", typeName) + .build()); + typeBuilder.addField(FieldSpec.builder(ParameterizedTypeName.get(AvroTypes.datumWriter, typeName), "WRITER") + .addModifiers(Modifier.PRIVATE, Modifier.STATIC, Modifier.FINAL) + .initializer("new $T<>(SCHEMA, SPECIFIC_DATA)", AvroTypes.datumWriter) + .build()); + + var method = MethodSpec.methodBuilder("writeBytes") + .addModifiers(Modifier.PUBLIC, Modifier.FINAL) + .addException(IOException.class) + .addAnnotation(Override.class) + .addParameter(ParameterSpec.builder(typeName, "value") + .addAnnotation(Nullable.class).build()) + .returns(ArrayTypeName.of(TypeName.BYTE)); + method.beginControlFlow("if (value == null)"); + method.addStatement("return EMPTY"); + method.endControlFlow(); + method.beginControlFlow("try (var os = new $T())", ByteArrayOutputStream.class); + method.addStatement("var encoder = $T.get().jsonEncoder(SCHEMA, os)", AvroTypes.encoderFactory); + method.addStatement("WRITER.write(value, encoder)"); + method.addStatement("encoder.flush()"); + method.addStatement("return os.toByteArray()"); + method.endControlFlow(); + + typeBuilder.addMethod(method.build()); + TypeSpec spec = typeBuilder.build(); + var packageElement = AvroUtils.classPackage(this.env.getElementUtils(), element); + var javaFile = JavaFile.builder(packageElement, spec).build(); + CommonUtils.safeWriteTo(this.env, javaFile); + } +} diff --git a/avro/avro-annotation-processor/src/main/resources/META-INF/services/javax.annotation.processing.Processor b/avro/avro-annotation-processor/src/main/resources/META-INF/services/javax.annotation.processing.Processor new file mode 100644 index 000000000..c6d800dd7 --- /dev/null +++ b/avro/avro-annotation-processor/src/main/resources/META-INF/services/javax.annotation.processing.Processor @@ -0,0 +1 @@ +ru.tinkoff.kora.avro.annotation.processor.AvroAnnotationProcessor diff --git a/avro/avro-annotation-processor/src/main/resources/META-INF/services/ru.tinkoff.kora.kora.app.annotation.processor.extension.ExtensionFactory b/avro/avro-annotation-processor/src/main/resources/META-INF/services/ru.tinkoff.kora.kora.app.annotation.processor.extension.ExtensionFactory new file mode 100644 index 000000000..e08917271 --- /dev/null +++ b/avro/avro-annotation-processor/src/main/resources/META-INF/services/ru.tinkoff.kora.kora.app.annotation.processor.extension.ExtensionFactory @@ -0,0 +1 @@ +ru.tinkoff.kora.avro.annotation.processor.extension.AvroExtensionFactory diff --git a/avro/avro-annotation-processor/src/test/java/ru/tinkoff/kora/avro/annotation/processor/AbstractAvroAnnotationProcessorTest.java b/avro/avro-annotation-processor/src/test/java/ru/tinkoff/kora/avro/annotation/processor/AbstractAvroAnnotationProcessorTest.java new file mode 100644 index 000000000..5b712147b --- /dev/null +++ b/avro/avro-annotation-processor/src/test/java/ru/tinkoff/kora/avro/annotation/processor/AbstractAvroAnnotationProcessorTest.java @@ -0,0 +1,269 @@ +package ru.tinkoff.kora.avro.annotation.processor; + +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.io.*; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.intellij.lang.annotations.Language; +import org.jetbrains.annotations.Nullable; +import ru.tinkoff.kora.annotation.processor.common.AbstractAnnotationProcessorTest; +import ru.tinkoff.kora.avro.common.AvroReader; +import ru.tinkoff.kora.avro.common.AvroWriter; + +import java.io.*; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.time.Instant; +import java.util.Base64; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public abstract class AbstractAvroAnnotationProcessorTest extends AbstractAnnotationProcessorTest { + @Override + protected String commonImports() { + return super.commonImports() + """ + import ru.tinkoff.kora.common.KoraApp; + import ru.tinkoff.kora.avro.common.annotation.*; + import ru.tinkoff.kora.avro.common.AvroReader; + import ru.tinkoff.kora.avro.common.AvroWriter; + import java.util.Optional; + """; + } + + protected IndexedRecord getTestAvroGenerated() { + IndexedRecord testAvro = (IndexedRecord) newGeneratedObject("TestAvro").get(); + testAvro.put(0, "cluster"); + testAvro.put(1, Instant.EPOCH); + testAvro.put(2, "descr"); + testAvro.put(3, 12345L); + testAvro.put(4, true); + return testAvro; + } + + protected byte[] getTestAvroAsBytes() { + return Base64.getDecoder().decode("DmNsdXN0ZXICAAIKZGVzY3IC8sABAgE="); + } + + protected String getTestAvroAsJson() { + return "{\"cluster\":\"cluster\",\"date\":{\"long\":0},\"description\":{\"string\":\"descr\"},\"counter\":{\"long\":12345},\"flag\":{\"boolean\":true}}"; + } + + protected void assertThatTestAvroValid(IndexedRecord expected, IndexedRecord actual) { + assertThat(actual).isNotNull(); + assertThat(actual.get(0).toString()).isEqualTo(actual.get(0).toString()); + assertThat(actual.get(1)).isEqualTo(expected.get(1)); + assertThat(actual.get(2).toString()).isEqualTo(actual.get(2).toString()); + assertThat(actual.get(3)).isEqualTo(expected.get(3)); + assertThat(actual.get(4)).isEqualTo(expected.get(4)); + } + + protected String getAvroClass() { + try { + List strings = Files.lines(new File("build/generated/sources/avro/tinkoff/kora/TestAvro.java").toPath()) + .map(s -> s.replace("tinkoff.kora.", "")) + .toList(); + String avro = String.join("\n", strings.subList(7, strings.size())); + return avro.replace("tinkoff.kora", testPackage()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + protected void compile(@Language("java") String... sources) { + var compileResult = compile(List.of(new AvroAnnotationProcessor()), sources); + if (compileResult.isFailed()) { + throw compileResult.compilationException(); + } + } + + @SuppressWarnings("unchecked") + protected AvroReader readerBinary(Class forClass, Object... params) { + try { + return (AvroReader) this.compileResult.loadClass("$" + forClass + "_AvroBinaryReader") + .getConstructors()[0] + .newInstance(params); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @SuppressWarnings("unchecked") + protected AvroReader readerBinary(String forClass, Object... params) { + try { + return (AvroReader) this.compileResult.loadClass("$" + forClass + "_AvroBinaryReader") + .getConstructors()[0] + .newInstance(params); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @SuppressWarnings("unchecked") + protected AvroReader readerJson(String forClass, Object... params) { + try { + return (AvroReader) this.compileResult.loadClass("$" + forClass + "_AvroJsonReader") + .getConstructors()[0] + .newInstance(params); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @SuppressWarnings("unchecked") + protected AvroWriter writerBinary(String forClass, Object... params) { + try { + return (AvroWriter) this.compileResult.loadClass("$" + forClass + "_AvroBinaryWriter") + .getConstructors()[0] + .newInstance(params); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @SuppressWarnings("unchecked") + protected AvroWriter writerJson(String forClass, Object... params) { + try { + return (AvroWriter) this.compileResult.loadClass("$" + forClass + "_AvroJsonWriter") + .getConstructors()[0] + .newInstance(params); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + protected ReaderAndWriter mapper(String forClass) { + return mapper(forClass, List.of(), List.of()); + } + + protected ReaderAndWriter mapper(String forClass, List readerParams, List writerParams) { + AvroReader reader = readerBinary(forClass, readerParams.toArray()); + AvroWriter writer = writerBinary(forClass, writerParams.toArray()); + return new ReaderAndWriter(reader, writer); + } + + protected static class ReaderAndWriter implements AvroReader, AvroWriter { + private final AvroReader reader; + private final AvroWriter writer; + + protected ReaderAndWriter(AvroReader reader, AvroWriter writer) { + this.reader = reader; + this.writer = writer; + } + + @Nullable + @Override + public T read(ByteBuffer buffer) throws IOException { + return reader.read(buffer); + } + + @Nullable + @Override + public T read(byte[] bytes) throws IOException { + return reader.read(bytes); + } + + @Nullable + @Override + public T read(byte[] bytes, int offset, int length) throws IOException { + return reader.read(bytes, offset, length); + } + + @Nullable + @Override + public T read(InputStream is) throws IOException { + return reader.read(is); + } + + @Override + public byte[] writeBytes(@Nullable T value) throws IOException { + return writer.writeBytes(value); + } + + public void verify(T expectedObject, String expectedAvro) { + verifyRead(expectedAvro, expectedObject); + verifyWrite(expectedObject, expectedAvro); + } + + public void verifyRead(String expectedAvro, T expectedObject) { + try { + var object = this.reader.read(expectedAvro.getBytes(StandardCharsets.UTF_8)); + assertThat(object).isEqualTo(expectedObject); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public void verifyWrite(T expectedObject, String expectedAvro) { + try { + var Avro = this.writer.writeBytes(expectedObject); + assertThat(Avro).asString(StandardCharsets.UTF_8).isEqualTo(expectedAvro); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + // json + protected byte[] writeAsJson(IndexedRecord value) { + try (var stream = new ByteArrayOutputStream()) { + var writer = new SpecificDatumWriter<>(value.getSchema()); + Encoder jsonEncoder = EncoderFactory.get().jsonEncoder(value.getSchema(), stream); + writer.write(value, jsonEncoder); + jsonEncoder.flush(); + return stream.toByteArray(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + // header + protected byte[] writeAsBinary(IndexedRecord value) { + try (var stream = new ByteArrayOutputStream()) { + Field fieldData = value.getClass().getDeclaredField("MODEL$"); + fieldData.setAccessible(true); + SpecificData data = (SpecificData) fieldData.get(value); + + var writer = new SpecificDatumWriter<>(value.getSchema(), data); + Encoder encoder = EncoderFactory.get().directBinaryEncoder(stream, null); + writer.write(value, encoder); + encoder.flush(); + return stream.toByteArray(); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + // fields + protected IndexedRecord readAsBinary(byte[] value) { + try { + Field fieldData = getTestAvroGenerated().getClass().getDeclaredField("MODEL$"); + fieldData.setAccessible(true); + SpecificData data = (SpecificData) fieldData.get(value); + + var reader = new SpecificDatumReader<>(getTestAvroGenerated().getSchema(), getTestAvroGenerated().getSchema(), data); + var binaryDecoder = DecoderFactory.get().binaryDecoder(value, null); + return (IndexedRecord) reader.read(null, binaryDecoder); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + protected IndexedRecord readAsJson(byte[] value) { + try { + Field fieldData = getTestAvroGenerated().getClass().getDeclaredField("MODEL$"); + fieldData.setAccessible(true); + SpecificData data = (SpecificData) fieldData.get(value); + + var reader = new SpecificDatumReader<>(getTestAvroGenerated().getSchema(), getTestAvroGenerated().getSchema(), data); + var binaryDecoder = DecoderFactory.get().jsonDecoder(getTestAvroGenerated().getSchema(), new ByteArrayInputStream(value)); + return (IndexedRecord) reader.read(null, binaryDecoder); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } +} diff --git a/avro/avro-annotation-processor/src/test/java/ru/tinkoff/kora/avro/annotation/processor/AvroBinaryTests.java b/avro/avro-annotation-processor/src/test/java/ru/tinkoff/kora/avro/annotation/processor/AvroBinaryTests.java new file mode 100644 index 000000000..7b37673cc --- /dev/null +++ b/avro/avro-annotation-processor/src/test/java/ru/tinkoff/kora/avro/annotation/processor/AvroBinaryTests.java @@ -0,0 +1,101 @@ +package ru.tinkoff.kora.avro.annotation.processor; + +import org.apache.avro.generic.IndexedRecord; +import org.junit.jupiter.api.Test; +import ru.tinkoff.kora.kora.app.annotation.processor.KoraAppProcessor; + +import java.io.IOException; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public class AvroBinaryTests extends AbstractAvroAnnotationProcessorTest { + + @Test + public void testReaderFromExtension() { + compile(List.of(new KoraAppProcessor()), + getAvroClass(), + """ + @KoraApp + public interface TestApp { + @Root + default String root(AvroReader r) {return "";} + } + """); + + compileResult.assertSuccess(); + var reader = readerBinary("TestAvro"); + assertThat(reader).isNotNull(); + + var testAvro = getTestAvroGenerated(); + byte[] bytes = getTestAvroAsBytes(); + IndexedRecord read = reader.readUnchecked(bytes); + assertThatTestAvroValid(testAvro, read); + } + + @Test + public void testReaderBinaryFromExtension() { + compile(List.of(new KoraAppProcessor()), + getAvroClass(), + """ + @KoraApp + public interface TestApp { + @Root + default String root(@AvroBinary AvroReader r) {return "";} + } + """); + + compileResult.assertSuccess(); + var reader = readerBinary("TestAvro"); + assertThat(reader).isNotNull(); + + var testAvro = getTestAvroGenerated(); + byte[] bytes = getTestAvroAsBytes(); + IndexedRecord read = reader.readUnchecked(bytes); + assertThatTestAvroValid(testAvro, read); + } + + @Test + public void testWriterFromExtension() throws IOException { + compile(List.of(new KoraAppProcessor()), + getAvroClass(), + """ + @KoraApp + public interface TestApp { + @Root + default String root(AvroWriter r) {return "";} + } + """); + + compileResult.assertSuccess(); + var writer = writerBinary("TestAvro"); + assertThat(writer).isNotNull(); + + IndexedRecord testAvro = getTestAvroGenerated(); + byte[] bytes = writer.writeBytesUnchecked(testAvro); + IndexedRecord restored = readAsBinary(bytes); + assertThatTestAvroValid(testAvro, restored); + } + + @Test + public void testWriterBinaryFromExtension() { + compile(List.of(new KoraAppProcessor()), + getAvroClass(), + """ + @KoraApp + public interface TestApp { + @Root + default String root(@AvroBinary AvroWriter r) {return "";} + } + """); + + compileResult.assertSuccess(); + var writer = writerBinary("TestAvro"); + assertThat(writer).isNotNull(); + + IndexedRecord testAvro = getTestAvroGenerated(); + byte[] bytes = writer.writeBytesUnchecked(testAvro); + IndexedRecord restored = readAsBinary(bytes); + assertThatTestAvroValid(testAvro, restored); + } +} diff --git a/avro/avro-annotation-processor/src/test/java/ru/tinkoff/kora/avro/annotation/processor/AvroJsonTests.java b/avro/avro-annotation-processor/src/test/java/ru/tinkoff/kora/avro/annotation/processor/AvroJsonTests.java new file mode 100644 index 000000000..ed78aaccd --- /dev/null +++ b/avro/avro-annotation-processor/src/test/java/ru/tinkoff/kora/avro/annotation/processor/AvroJsonTests.java @@ -0,0 +1,57 @@ +package ru.tinkoff.kora.avro.annotation.processor; + +import org.apache.avro.generic.IndexedRecord; +import org.junit.jupiter.api.Test; +import ru.tinkoff.kora.kora.app.annotation.processor.KoraAppProcessor; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public class AvroJsonTests extends AbstractAvroAnnotationProcessorTest { + + @Test + public void testReaderFromExtension() { + compile(List.of(new KoraAppProcessor()), + getAvroClass(), + """ + @KoraApp + public interface TestApp { + @Root + default String root(@AvroJson AvroReader r) {return "";} + } + """); + + compileResult.assertSuccess(); + var reader = readerJson("TestAvro"); + assertThat(reader).isNotNull(); + + var testAvro = getTestAvroGenerated(); + byte[] bytes = getTestAvroAsJson().getBytes(StandardCharsets.UTF_8); + IndexedRecord read = reader.readUnchecked(bytes); + assertThatTestAvroValid(testAvro, read); + } + + @Test + public void testWriterFromExtension() { + compile(List.of(new KoraAppProcessor()), + getAvroClass(), + """ + @KoraApp + public interface TestApp { + @Root + default String root(@AvroJson AvroWriter r) {return "";} + } + """); + + compileResult.assertSuccess(); + var writer = writerJson("TestAvro"); + assertThat(writer).isNotNull(); + + IndexedRecord testAvro = getTestAvroGenerated(); + byte[] bytes = writer.writeBytesUnchecked(testAvro); + IndexedRecord restored = readAsJson(bytes); + assertThatTestAvroValid(testAvro, restored); + } +} diff --git a/avro/avro-annotation-processor/src/test/resources/avro/TestAvro.avsc b/avro/avro-annotation-processor/src/test/resources/avro/TestAvro.avsc new file mode 100644 index 000000000..f8b3bd82e --- /dev/null +++ b/avro/avro-annotation-processor/src/test/resources/avro/TestAvro.avsc @@ -0,0 +1,43 @@ +{ + "type": "record", + "name": "TestAvro", + "namespace": "tinkoff.kora", + "fields": [ + { + "name": "cluster", + "type": "string" + }, + { + "name": "date", + "type": [ + "null", + { + "type": "long", + "logicalType": "timestamp-millis" + } + ] + }, + { + "name": "description", + "type": [ + "null", + "string" + ] + }, + { + "name": "counter", + "type": [ + "null", + "long" + ] + }, + { + "name": "flag", + "type": [ + "null", + "boolean" + ], + "default": null + } + ] +} diff --git a/avro/avro-common/build.gradle b/avro/avro-common/build.gradle new file mode 100644 index 000000000..48c9232b9 --- /dev/null +++ b/avro/avro-common/build.gradle @@ -0,0 +1,5 @@ +dependencies { + api project(":common") + + api libs.avro +} diff --git a/avro/avro-common/src/main/java/ru/tinkoff/kora/avro/common/AvroReader.java b/avro/avro-common/src/main/java/ru/tinkoff/kora/avro/common/AvroReader.java new file mode 100644 index 000000000..cfd621593 --- /dev/null +++ b/avro/avro-common/src/main/java/ru/tinkoff/kora/avro/common/AvroReader.java @@ -0,0 +1,75 @@ +package ru.tinkoff.kora.avro.common; + +import jakarta.annotation.Nullable; +import org.apache.avro.generic.IndexedRecord; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; + +/** + * Русский: Контракт читателя AVRO со всеми методами чтения + *
+ * English: AVRO reader contract with all read methods + */ +public interface AvroReader { + + @Nullable + T read(InputStream is) throws IOException; + + @Nullable + default T read(byte[] bytes) throws IOException { + try (var is = new ByteArrayInputStream(bytes)) { + return read(is); + } + } + + @Nullable + default T read(byte[] bytes, int offset, int length) throws IOException { + try (var is = new ByteArrayInputStream(bytes, offset, length)) { + return read(is); + } + } + + @Nullable + default T read(ByteBuffer buffer) throws IOException { + if (buffer.hasArray()) { + try (var is = new ByteArrayInputStream(buffer.array())) { + return read(is); + } + } else { + try (var is = new ByteBufferInputStream(buffer)) { + return read(is); + } + } + } + + @Nullable + default T readUnchecked(byte[] bytes) throws UncheckedIOException { + try { + return read(bytes); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Nullable + default T readUnchecked(byte[] bytes, int offset, int length) throws UncheckedIOException { + try { + return read(bytes, offset, length); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Nullable + default T readUnchecked(InputStream is) throws UncheckedIOException { + try { + return read(is); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/avro/avro-common/src/main/java/ru/tinkoff/kora/avro/common/AvroWriter.java b/avro/avro-common/src/main/java/ru/tinkoff/kora/avro/common/AvroWriter.java new file mode 100644 index 000000000..12368456c --- /dev/null +++ b/avro/avro-common/src/main/java/ru/tinkoff/kora/avro/common/AvroWriter.java @@ -0,0 +1,25 @@ +package ru.tinkoff.kora.avro.common; + +import jakarta.annotation.Nullable; +import org.apache.avro.generic.IndexedRecord; + +import java.io.IOException; +import java.io.UncheckedIOException; + +/** + * Русский: Контракт писателя AVRO со всеми методами записи + *
+ * English: AVRO writer contract with all write methods + */ +public interface AvroWriter { + + byte[] writeBytes(@Nullable T value) throws IOException; + + default byte[] writeBytesUnchecked(@Nullable T value) throws UncheckedIOException { + try { + return writeBytes(value); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/avro/avro-common/src/main/java/ru/tinkoff/kora/avro/common/ByteBufferInputStream.java b/avro/avro-common/src/main/java/ru/tinkoff/kora/avro/common/ByteBufferInputStream.java new file mode 100644 index 000000000..6608478a0 --- /dev/null +++ b/avro/avro-common/src/main/java/ru/tinkoff/kora/avro/common/ByteBufferInputStream.java @@ -0,0 +1,124 @@ +package ru.tinkoff.kora.avro.common; + +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Objects; + +final class ByteBufferInputStream extends InputStream { + + /** + * The input ByteBuffer that was provided. + * The ByteBuffer should be supplied with position and limit correctly set as appropriate + */ + private final ByteBuffer backendBuffer; + + public ByteBufferInputStream(ByteBuffer backendBuffer) { + Objects.requireNonNull(backendBuffer, "Given backend buffer can not be null!"); + this.backendBuffer = backendBuffer; + this.backendBuffer.mark(); // to prevent java.nio.InvalidMarkException on InputStream.reset() if mark had not been set + } + + /** + * Reads the next byte of data from this ByteBuffer. The value byte is returned as an int in the range 0-255. + * If no byte is available because the end of the buffer has been reached, the value -1 is returned. + * + * @return the next byte of data, or -1 if the limit/end of the buffer has been reached. + */ + public int read() { + return backendBuffer.hasRemaining() + ? (backendBuffer.get() & 0xff) + : -1; + } + + /** + * Reads up to len bytes of data into an array of bytes from this ByteBuffer. + * If the buffer has no remaining bytes, then -1 is returned to indicate end of file. + * Otherwise, the number k of bytes read is equal to the smaller of len and buffer remaining. + * + * @param b the buffer into which the data is read. + * @param off the start offset in the destination array b + * @param len the maximum number of bytes read. + * @return the total number of bytes read into the buffer, or -1 if there is no more data because the limit/end of + * the ByteBuffer has been reached. + * @throws NullPointerException If b is null. + * @throws IndexOutOfBoundsException If off is negative, len is negative, or len is greater than b.length - off + */ + public int read(byte b[], int off, int len) { + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } + + if (!backendBuffer.hasRemaining()) { + return -1; + } + + int remaining = backendBuffer.remaining(); + if (len > remaining) { + len = remaining; + } + + if (len <= 0) { + return 0; + } + backendBuffer.get(b, off, len); + return len; + } + + /** + * Skips n bytes of input from this ByteBuffer. Fewer bytes might be skipped if the limit is reached. + * + * @param n the number of bytes to be skipped. + * @return the actual number of bytes skipped. + */ + public long skip(long n) { + int skipAmount = (n < 0) + ? 0 + : ((n > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) n); + + if (skipAmount > backendBuffer.remaining()) { + skipAmount = backendBuffer.remaining(); + } + + int newPos = backendBuffer.position() + skipAmount; + backendBuffer.position(newPos); + return skipAmount; + } + + /** + * Returns remaining bytes available in this ByteBuffer + * + * @return the number of remaining bytes that can be read (or skipped over) from this ByteBuffer. + */ + public int available() { + return backendBuffer.remaining(); + } + + public boolean markSupported() { + return true; + } + + /** + * Set the current marked position in the ByteBuffer. + *

Note: The readAheadLimit for this class has no meaning. + */ + public void mark(int readAheadLimit) { + backendBuffer.mark(); + } + + /** + * Resets the ByteBuffer to the marked position. + */ + public void reset() { + backendBuffer.reset(); + } + + /** + * Closing a ByteBuffer has no effect. + * The methods in this class can be called after the stream has been closed without generating an IOException. + */ + public void close() { + + } +} diff --git a/avro/avro-common/src/main/java/ru/tinkoff/kora/avro/common/annotation/AvroBinary.java b/avro/avro-common/src/main/java/ru/tinkoff/kora/avro/common/annotation/AvroBinary.java new file mode 100644 index 000000000..6c0ba1c11 --- /dev/null +++ b/avro/avro-common/src/main/java/ru/tinkoff/kora/avro/common/annotation/AvroBinary.java @@ -0,0 +1,20 @@ +package ru.tinkoff.kora.avro.common.annotation; + +import ru.tinkoff.kora.common.Tag; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Русский: Аннотация указывает что для типа представлен в бинарном формате AVRO. + *


+ * English: Annotation specifies that for the type is represented in AVRO binary format. + */ +@Tag(AvroBinary.class) +@Target({ElementType.TYPE, ElementType.PARAMETER, ElementType.METHOD, ElementType.TYPE_USE}) +@Retention(RetentionPolicy.RUNTIME) +public @interface AvroBinary { + +} diff --git a/avro/avro-common/src/main/java/ru/tinkoff/kora/avro/common/annotation/AvroJson.java b/avro/avro-common/src/main/java/ru/tinkoff/kora/avro/common/annotation/AvroJson.java new file mode 100644 index 000000000..101a3b909 --- /dev/null +++ b/avro/avro-common/src/main/java/ru/tinkoff/kora/avro/common/annotation/AvroJson.java @@ -0,0 +1,20 @@ +package ru.tinkoff.kora.avro.common.annotation; + +import ru.tinkoff.kora.common.Tag; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Русский: Аннотация указывает что для типа представлен в формате AVRO в виде JSON. + *
+ * English: Annotation specifies that for the type is represented in AVRO format as JSON. + */ +@Tag(AvroJson.class) +@Target({ElementType.TYPE, ElementType.PARAMETER, ElementType.METHOD, ElementType.TYPE_USE}) +@Retention(RetentionPolicy.RUNTIME) +public @interface AvroJson { + +} diff --git a/avro/avro-module/build.gradle b/avro/avro-module/build.gradle new file mode 100644 index 000000000..7dcd8ab9b --- /dev/null +++ b/avro/avro-module/build.gradle @@ -0,0 +1,11 @@ +dependencies { + compileOnly project(":http:http-server-common") + compileOnly project(":http:http-client-common") + compileOnly project(":kafka:kafka") + + api project(":avro:avro-common") + + api("io.confluent:kafka-avro-serializer:7.7.2") { + exclude group: "org.apache.kafka", module: "kafka-clients" + } +} diff --git a/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/AvroModule.java b/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/AvroModule.java new file mode 100644 index 000000000..f762cce4c --- /dev/null +++ b/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/AvroModule.java @@ -0,0 +1,157 @@ +package ru.tinkoff.kora.avro.module; + +import org.apache.avro.specific.SpecificRecord; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import ru.tinkoff.kora.avro.common.AvroReader; +import ru.tinkoff.kora.avro.common.AvroWriter; +import ru.tinkoff.kora.avro.common.annotation.AvroBinary; +import ru.tinkoff.kora.avro.common.annotation.AvroJson; +import ru.tinkoff.kora.avro.module.http.client.*; +import ru.tinkoff.kora.avro.module.http.server.AvroAsyncHttpServerRequestMapper; +import ru.tinkoff.kora.avro.module.http.server.AvroHttpServerRequestMapper; +import ru.tinkoff.kora.avro.module.kafka.KafkaAvroTypedDeserializer; +import ru.tinkoff.kora.avro.module.kafka.KafkaAvroTypedSerializer; +import ru.tinkoff.kora.common.DefaultComponent; +import ru.tinkoff.kora.http.common.HttpResponseEntity; +import ru.tinkoff.kora.http.common.body.HttpBody; +import ru.tinkoff.kora.http.server.common.HttpServerResponse; +import ru.tinkoff.kora.http.server.common.handler.HttpServerResponseMapper; + +public interface AvroModule { + + // Kafka + @AvroBinary + @DefaultComponent + default Serializer avroBinaryKafkaSpecificSerializer(@AvroBinary AvroWriter writer) { + return new KafkaAvroTypedSerializer<>(writer); + } + + @AvroJson + @DefaultComponent + default Serializer avroAvroKafkapecificSerializer(@AvroJson AvroWriter writer) { + return new KafkaAvroTypedSerializer<>(writer); + } + + @AvroBinary + @DefaultComponent + default Deserializer avroBinaryKafkaSpecificDeserializer(@AvroBinary AvroReader reader) { + return new KafkaAvroTypedDeserializer<>(reader); + } + + @AvroJson + @DefaultComponent + default Deserializer avroAvroKafkaSpecificDeserializer(@AvroJson AvroReader reader) { + return new KafkaAvroTypedDeserializer<>(reader); + } + + // HTTP Server + @AvroBinary + @DefaultComponent + default HttpServerResponseMapper avroBinaryHttpServerResponseMapper(@AvroBinary AvroWriter writer) { + return (ctx, request, result) -> HttpServerResponse.of(200, HttpBody.of("application/avro", writer.writeBytes(result))); + } + + @AvroJson + @DefaultComponent + default HttpServerResponseMapper avroAvroHttpServerResponseMapper(@AvroJson AvroWriter writer) { + return (ctx, request, result) -> HttpServerResponse.of(200, HttpBody.json(writer.writeBytes(result))); + } + + @AvroBinary + @DefaultComponent + default HttpServerResponseMapper> avroBinaryHttpServerResponseEntityMapper(@AvroBinary AvroWriter writer) { + return (ctx, request, result) -> HttpServerResponse.of(result.code(), result.headers(), HttpBody.of("application/avro", writer.writeBytes(result.body()))); + } + + @AvroJson + @DefaultComponent + default HttpServerResponseMapper> avroAvroHttpServerResponseEntityMapper(@AvroJson AvroWriter writer) { + return (ctx, request, result) -> HttpServerResponse.of(result.code(), result.headers(), HttpBody.json(writer.writeBytes(result.body()))); + } + + @AvroBinary + @DefaultComponent + default AvroHttpServerRequestMapper avroBinaryRequestMapper(@AvroBinary AvroReader reader) { + return new AvroHttpServerRequestMapper<>(reader); + } + + @AvroJson + @DefaultComponent + default AvroHttpServerRequestMapper avroJsonRequestMapper(@AvroJson AvroReader reader) { + return new AvroHttpServerRequestMapper<>(reader); + } + + @AvroBinary + @DefaultComponent + default AvroAsyncHttpServerRequestMapper avroBinaryAsyncHttpServerRequestMapper(@AvroBinary AvroReader reader) { + return new AvroAsyncHttpServerRequestMapper<>(reader); + } + + @AvroJson + @DefaultComponent + default AvroAsyncHttpServerRequestMapper avroJsonAsyncHttpServerRequestMapper(@AvroJson AvroReader reader) { + return new AvroAsyncHttpServerRequestMapper<>(reader); + } + + // HTTP Client + @AvroBinary + @DefaultComponent + default AvroHttpClientRequestMapper avroBinaryHttpClientRequestMapper(@AvroBinary AvroWriter avroWriter) { + return new AvroHttpClientRequestMapper<>(avroWriter); + } + + @AvroJson + @DefaultComponent + default AvroHttpClientRequestMapper avroJsonHttpClientRequestMapper(@AvroJson AvroWriter avroWriter) { + return new AvroHttpClientRequestMapper<>(avroWriter); + } + + @AvroBinary + @DefaultComponent + default AvroHttpClientResponseMapper avroBinaryHttpClientResponseMapper(@AvroBinary AvroReader reader) { + return new AvroHttpClientResponseMapper<>(reader); + } + + @AvroJson + @DefaultComponent + default AvroHttpClientResponseMapper avroJsonHttpClientResponseMapper(@AvroJson AvroReader reader) { + return new AvroHttpClientResponseMapper<>(reader); + } + + @AvroBinary + @DefaultComponent + default AvroAsyncHttpClientResponseMapper avroBinaryAsyncHttpClientResponseMapper(@AvroBinary AvroReader reader) { + return new AvroAsyncHttpClientResponseMapper<>(reader); + } + + @AvroJson + @DefaultComponent + default AvroAsyncHttpClientResponseMapper avroJsonAsyncHttpClientResponseMapper(@AvroJson AvroReader reader) { + return new AvroAsyncHttpClientResponseMapper<>(reader); + } + + @AvroBinary + @DefaultComponent + default AvroHttpClientResponseEntityMapper avroBinaryHttpClientResponseEntityMapper(@AvroBinary AvroReader reader) { + return new AvroHttpClientResponseEntityMapper<>(reader); + } + + @AvroJson + @DefaultComponent + default AvroHttpClientResponseEntityMapper avroJsonHttpClientResponseEntityMapper(@AvroJson AvroReader reader) { + return new AvroHttpClientResponseEntityMapper<>(reader); + } + + @AvroBinary + @DefaultComponent + default AvroAsyncHttpClientResponseEntityMapper avroBinaryAsyncHttpClientResponseEntityMapper(@AvroBinary AvroReader reader) { + return new AvroAsyncHttpClientResponseEntityMapper<>(reader); + } + + @AvroJson + @DefaultComponent + default AvroAsyncHttpClientResponseEntityMapper avroJsonAsyncHttpClientResponseEntityMapper(@AvroJson AvroReader reader) { + return new AvroAsyncHttpClientResponseEntityMapper<>(reader); + } +} diff --git a/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/http/AvroHttpBodyOutput.java b/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/http/AvroHttpBodyOutput.java new file mode 100644 index 000000000..7b1479f7c --- /dev/null +++ b/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/http/AvroHttpBodyOutput.java @@ -0,0 +1,55 @@ +package ru.tinkoff.kora.avro.module.http; + +import jakarta.annotation.Nullable; +import org.apache.avro.specific.SpecificRecord; +import ru.tinkoff.kora.avro.common.AvroWriter; +import ru.tinkoff.kora.common.Context; +import ru.tinkoff.kora.common.util.flow.LazySingleSubscription; +import ru.tinkoff.kora.http.common.body.HttpBodyOutput; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.Flow; + +public final class AvroHttpBodyOutput implements HttpBodyOutput { + + private final AvroWriter writer; + private final Context context; + @Nullable + private final T value; + + public AvroHttpBodyOutput(AvroWriter writer, Context context, @Nullable T value) { + this.writer = writer; + this.value = value; + this.context = context; + } + + @Override + public long contentLength() { + return -1; + } + + @Override + public String contentType() { + return "application/avro"; + } + + @Override + public void subscribe(Flow.Subscriber subscriber) { + subscriber.onSubscribe(new LazySingleSubscription<>(subscriber, context, () -> { + var resultBytes = this.writer.writeBytes(value); + return ByteBuffer.wrap(resultBytes); + })); + } + + @Override + public void write(OutputStream os) throws IOException { + this.writer.writeBytes(this.value); + } + + @Override + public void close() { + + } +} diff --git a/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/http/client/AvroAsyncHttpClientResponseEntityMapper.java b/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/http/client/AvroAsyncHttpClientResponseEntityMapper.java new file mode 100644 index 000000000..f7c3576b1 --- /dev/null +++ b/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/http/client/AvroAsyncHttpClientResponseEntityMapper.java @@ -0,0 +1,35 @@ +package ru.tinkoff.kora.avro.module.http.client; + +import jakarta.annotation.Nonnull; +import org.apache.avro.specific.SpecificRecord; +import ru.tinkoff.kora.avro.common.AvroReader; +import ru.tinkoff.kora.common.util.FlowUtils; +import ru.tinkoff.kora.http.client.common.HttpClientDecoderException; +import ru.tinkoff.kora.http.client.common.response.HttpClientResponse; +import ru.tinkoff.kora.http.client.common.response.HttpClientResponseMapper; +import ru.tinkoff.kora.http.common.HttpResponseEntity; + +import java.io.IOException; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; + +public class AvroAsyncHttpClientResponseEntityMapper implements HttpClientResponseMapper>> { + + private final AvroReader reader; + + public AvroAsyncHttpClientResponseEntityMapper(AvroReader reader) { + this.reader = reader; + } + + @Override + public CompletionStage> apply(@Nonnull HttpClientResponse response) throws HttpClientDecoderException { + return FlowUtils.toByteArrayFuture(response.body()).thenApply(bytes -> { + try { + var value = this.reader.read(bytes); + return HttpResponseEntity.of(response.code(), response.headers().toMutable(), value); + } catch (IOException e) { + throw new CompletionException(e); + } + }); + } +} diff --git a/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/http/client/AvroAsyncHttpClientResponseMapper.java b/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/http/client/AvroAsyncHttpClientResponseMapper.java new file mode 100644 index 000000000..54f9c140c --- /dev/null +++ b/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/http/client/AvroAsyncHttpClientResponseMapper.java @@ -0,0 +1,31 @@ +package ru.tinkoff.kora.avro.module.http.client; + +import org.apache.avro.specific.SpecificRecord; +import ru.tinkoff.kora.avro.common.AvroReader; +import ru.tinkoff.kora.common.util.FlowUtils; +import ru.tinkoff.kora.http.client.common.response.HttpClientResponse; +import ru.tinkoff.kora.http.client.common.response.HttpClientResponseMapper; + +import java.io.IOException; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; + +public class AvroAsyncHttpClientResponseMapper implements HttpClientResponseMapper> { + + private final AvroReader reader; + + public AvroAsyncHttpClientResponseMapper(AvroReader reader) { + this.reader = reader; + } + + @Override + public CompletionStage apply(HttpClientResponse response) { + return FlowUtils.toByteArrayFuture(response.body()).thenApply(bytes -> { + try { + return this.reader.read(bytes); + } catch (IOException e) { + throw new CompletionException(e); + } + }); + } +} diff --git a/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/http/client/AvroHttpClientRequestMapper.java b/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/http/client/AvroHttpClientRequestMapper.java new file mode 100644 index 000000000..1388ef9b9 --- /dev/null +++ b/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/http/client/AvroHttpClientRequestMapper.java @@ -0,0 +1,22 @@ +package ru.tinkoff.kora.avro.module.http.client; + +import org.apache.avro.specific.SpecificRecord; +import ru.tinkoff.kora.avro.common.AvroWriter; +import ru.tinkoff.kora.avro.module.http.AvroHttpBodyOutput; +import ru.tinkoff.kora.common.Context; +import ru.tinkoff.kora.http.client.common.request.HttpClientRequestMapper; +import ru.tinkoff.kora.http.common.body.HttpBodyOutput; + +public class AvroHttpClientRequestMapper implements HttpClientRequestMapper { + + private final AvroWriter writer; + + public AvroHttpClientRequestMapper(AvroWriter writer) { + this.writer = writer; + } + + @Override + public HttpBodyOutput apply(Context ctx, T value) { + return new AvroHttpBodyOutput<>(this.writer, ctx, value); + } +} diff --git a/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/http/client/AvroHttpClientResponseEntityMapper.java b/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/http/client/AvroHttpClientResponseEntityMapper.java new file mode 100644 index 000000000..826ea01a0 --- /dev/null +++ b/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/http/client/AvroHttpClientResponseEntityMapper.java @@ -0,0 +1,50 @@ +package ru.tinkoff.kora.avro.module.http.client; + +import jakarta.annotation.Nonnull; +import org.apache.avro.specific.SpecificRecord; +import ru.tinkoff.kora.avro.common.AvroReader; +import ru.tinkoff.kora.http.client.common.HttpClientDecoderException; +import ru.tinkoff.kora.http.client.common.HttpClientException; +import ru.tinkoff.kora.http.client.common.HttpClientUnknownException; +import ru.tinkoff.kora.http.client.common.response.HttpClientResponse; +import ru.tinkoff.kora.http.client.common.response.HttpClientResponseMapper; +import ru.tinkoff.kora.http.common.HttpResponseEntity; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; + +public final class AvroHttpClientResponseEntityMapper implements HttpClientResponseMapper> { + + private final AvroReader reader; + + public AvroHttpClientResponseEntityMapper(AvroReader reader) { + this.reader = reader; + } + + @Override + public HttpResponseEntity apply(@Nonnull HttpClientResponse response) throws IOException, HttpClientDecoderException { + try (var body = response.body(); + var is = body.asInputStream()) { + if (is != null) { + var value = reader.read(is); + return HttpResponseEntity.of(response.code(), response.headers().toMutable(), value); + } + + try { + var bytes = body.asArrayStage().toCompletableFuture().get(); + var value = this.reader.read(bytes); + return HttpResponseEntity.of(response.code(), response.headers().toMutable(), value); + } catch (InterruptedException e) { + throw new HttpClientUnknownException(e); + } catch (ExecutionException e) { + if (e.getCause() instanceof HttpClientException he) { + throw he; + } + if (e.getCause() != null) { + throw new HttpClientUnknownException(e.getCause()); + } + throw new HttpClientUnknownException(e); + } + } + } +} diff --git a/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/http/client/AvroHttpClientResponseMapper.java b/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/http/client/AvroHttpClientResponseMapper.java new file mode 100644 index 000000000..19f1c9096 --- /dev/null +++ b/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/http/client/AvroHttpClientResponseMapper.java @@ -0,0 +1,46 @@ +package ru.tinkoff.kora.avro.module.http.client; + +import org.apache.avro.specific.SpecificRecord; +import ru.tinkoff.kora.avro.common.AvroReader; +import ru.tinkoff.kora.http.client.common.HttpClientException; +import ru.tinkoff.kora.http.client.common.HttpClientUnknownException; +import ru.tinkoff.kora.http.client.common.response.HttpClientResponse; +import ru.tinkoff.kora.http.client.common.response.HttpClientResponseMapper; +import ru.tinkoff.kora.json.common.JsonReader; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; + +public class AvroHttpClientResponseMapper implements HttpClientResponseMapper { + + private final AvroReader reader; + + public AvroHttpClientResponseMapper(AvroReader reader) { + this.reader = reader; + } + + @Override + public T apply(HttpClientResponse response) throws IOException { + try (var body = response.body(); + var is = body.asInputStream()) { + if (is != null) { + return this.reader.read(is); + } + + try { + var bytes = body.asArrayStage().toCompletableFuture().get(); + return this.reader.read(bytes); + } catch (InterruptedException e) { + throw new HttpClientUnknownException(e); + } catch (ExecutionException e) { + if (e.getCause() instanceof HttpClientException he) { + throw he; + } + if (e.getCause() != null) { + throw new HttpClientUnknownException(e.getCause()); + } + throw new HttpClientUnknownException(e); + } + } + } +} diff --git a/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/http/server/AvroAsyncHttpServerRequestMapper.java b/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/http/server/AvroAsyncHttpServerRequestMapper.java new file mode 100644 index 000000000..d6e215a7b --- /dev/null +++ b/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/http/server/AvroAsyncHttpServerRequestMapper.java @@ -0,0 +1,45 @@ +package ru.tinkoff.kora.avro.module.http.server; + +import org.apache.avro.specific.SpecificRecord; +import ru.tinkoff.kora.avro.common.AvroReader; +import ru.tinkoff.kora.common.util.ByteBufferInputStream; +import ru.tinkoff.kora.common.util.FlowUtils; +import ru.tinkoff.kora.http.server.common.HttpServerRequest; +import ru.tinkoff.kora.http.server.common.handler.HttpServerRequestMapper; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; + +public final class AvroAsyncHttpServerRequestMapper implements HttpServerRequestMapper> { + + private final AvroReader reader; + + public AvroAsyncHttpServerRequestMapper(AvroReader reader) { + this.reader = reader; + } + + @Override + public CompletionStage apply(HttpServerRequest request) throws IOException { + var body = request.body(); + var fullContent = body.getFullContentIfAvailable(); + if (fullContent != null) { + try (body) { + if (fullContent.hasArray()) { + return CompletableFuture.completedFuture(this.reader.read(fullContent.array(), fullContent.arrayOffset(), fullContent.remaining())); + } else { + return CompletableFuture.completedFuture(this.reader.read(new ByteBufferInputStream(fullContent))); + } + } + } + + return FlowUtils.toByteArrayFuture(request.body()).thenApply(bytes -> { + try { + return this.reader.read(bytes); + } catch (IOException e) { + throw new CompletionException(e); + } + }); + } +} diff --git a/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/http/server/AvroHttpServerRequestMapper.java b/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/http/server/AvroHttpServerRequestMapper.java new file mode 100644 index 000000000..3f9ededbd --- /dev/null +++ b/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/http/server/AvroHttpServerRequestMapper.java @@ -0,0 +1,49 @@ +package ru.tinkoff.kora.avro.module.http.server; + +import org.apache.avro.specific.SpecificRecord; +import ru.tinkoff.kora.avro.common.AvroReader; +import ru.tinkoff.kora.common.util.ByteBufferInputStream; +import ru.tinkoff.kora.http.server.common.HttpServerRequest; +import ru.tinkoff.kora.http.server.common.HttpServerResponseException; +import ru.tinkoff.kora.http.server.common.handler.HttpServerRequestMapper; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; + +public final class AvroHttpServerRequestMapper implements HttpServerRequestMapper { + + private final AvroReader reader; + + public AvroHttpServerRequestMapper(AvroReader reader) { + this.reader = reader; + } + + @Override + public T apply(HttpServerRequest request) throws IOException { + try (var body = request.body()) { + var fullContent = body.getFullContentIfAvailable(); + if (fullContent != null) { + if (fullContent.hasArray()) { + return this.reader.read(fullContent.array(), fullContent.arrayOffset(), fullContent.remaining()); + } else { + return this.reader.read(new ByteBufferInputStream(fullContent)); + } + } + + try (var is = body.asInputStream()) { + if (is != null) { + return this.reader.read(is); + } + } + + try { + var bytes = body.asArrayStage().toCompletableFuture().get(); + return this.reader.read(bytes); + } catch (InterruptedException e) { + throw HttpServerResponseException.of(500, e); + } catch (ExecutionException e) { + throw HttpServerResponseException.of(500, e.getCause()); + } + } + } +} diff --git a/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/kafka/KafkaAvroTypedDeserializer.java b/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/kafka/KafkaAvroTypedDeserializer.java new file mode 100644 index 000000000..22c5f6aa5 --- /dev/null +++ b/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/kafka/KafkaAvroTypedDeserializer.java @@ -0,0 +1,51 @@ +package ru.tinkoff.kora.avro.module.kafka; + +import org.apache.avro.specific.SpecificRecord; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.Deserializer; +import ru.tinkoff.kora.avro.common.AvroReader; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class KafkaAvroTypedDeserializer implements Deserializer { + + protected static final byte MAGIC_BYTE = 0x0; + protected static final int ID_SIZE = 4; + + private final AvroReader avroReader; + + public KafkaAvroTypedDeserializer(AvroReader avroReader) { + this.avroReader = avroReader; + } + + @Override + public T deserialize(String topic, byte[] bytes) { + return deserialize(bytes); + } + + @Override + public T deserialize(String topic, Headers headers, byte[] bytes) { + return deserialize(bytes); + } + + protected T deserialize(byte[] payload) throws SerializationException { + if (payload == null || payload.length == 0) { + return null; + } + + return read(payload); + } + + private T read(byte[] payload) { + try { + int offset = 1 + ID_SIZE; + ByteBuffer buffer = ByteBuffer.wrap(payload, offset, payload.length - offset); + return avroReader.read(buffer); + } catch (IOException ex) { + String schemaId = new String(payload, 0, 1 + ID_SIZE); + throw new SerializationException("Error deserializing Avro message for id " + schemaId, ex.getCause()); + } + } +} diff --git a/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/kafka/KafkaAvroTypedSerializer.java b/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/kafka/KafkaAvroTypedSerializer.java new file mode 100644 index 000000000..e05437a8d --- /dev/null +++ b/avro/avro-module/src/main/java/ru/tinkoff/kora/avro/module/kafka/KafkaAvroTypedSerializer.java @@ -0,0 +1,63 @@ +package ru.tinkoff.kora.avro.module.kafka; + +import io.confluent.kafka.schemaregistry.avro.AvroSchema; +import org.apache.avro.specific.SpecificRecord; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.Serializer; +import ru.tinkoff.kora.avro.common.AvroWriter; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.ByteBuffer; + +public class KafkaAvroTypedSerializer implements Serializer { + + protected static final byte MAGIC_BYTE = 0x0; + protected static final int ID_SIZE = 4; + + private final AvroWriter avroWriter; + + public KafkaAvroTypedSerializer(AvroWriter avroWriter) { + this.avroWriter = avroWriter; + } + + @Override + public byte[] serialize(String topic, T data) { + return this.serialize(topic, null, data); + } + + @Override + public byte[] serialize(String topic, Headers headers, T record) { + if (record == null) { + return null; + } + + AvroSchema schema = new AvroSchema(record.getSchema()); + return serializeImpl(record, schema); + } + + protected byte[] serializeImpl(T object, AvroSchema schema) throws SerializationException, InvalidConfigurationException { + if (object == null) { + return null; + } + + try { + int id = schema.version(); + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + out.write(MAGIC_BYTE); + out.write(ByteBuffer.allocate(ID_SIZE).putInt(id).array()); + out.write(avroWriter.writeBytesUnchecked(object)); + return out.toByteArray(); + } + } catch (InterruptedIOException e) { + throw new TimeoutException("Error serializing Avro message", e); + } catch (IOException | RuntimeException e) { + // avro serialization can throw AvroRuntimeException, NullPointerException, ClassCastException, etc + throw new SerializationException("Error serializing Avro message", e); + } + } +} diff --git a/avro/avro-symbol-processor/build.gradle b/avro/avro-symbol-processor/build.gradle new file mode 100644 index 000000000..c0faecbf1 --- /dev/null +++ b/avro/avro-symbol-processor/build.gradle @@ -0,0 +1,54 @@ +import org.apache.avro.tool.SpecificCompilerTool + +buildscript { + dependencies { + classpath "org.apache.avro:avro-tools:1.12.0" + } +} + +apply from: "${project.rootDir}/gradle/kotlin-plugin.gradle" +apply from: "${project.rootDir}/gradle/in-test-generated.gradle" + +dependencies { + api project(':symbol-processor-common') + api project(':kora-app-symbol-processor') + + testImplementation project(':avro:avro-common') + testImplementation testFixtures(project(':symbol-processor-common')) + + testImplementation libs.classgraph + testImplementation libs.kotlin.compiler + testImplementation libs.kotlin.compiler + testImplementation libs.junit.jupiter + testImplementation libs.ksp + testImplementation libs.ksp.api + testImplementation libs.kotlin.coroutines.reactor + testImplementation libs.kotlin.coroutines.jdk8 +} + +tasks.register("generateAvroClasses") { + group("build") + + var inputDir = "$projectDir/src/test/resources/avro" + var outputDir = "$buildDir/generated/sources/avro" + inputs.dir(inputDir) + outputs.dir(outputDir) + logging.captureStandardOutput(LogLevel.INFO) + logging.captureStandardError(LogLevel.ERROR) + + doFirst { + delete outputDir + delete "$buildDir/in-test-generated-ksp/sources/tinkoff/kora" + } + + doLast { + var params = ["-bigDecimal", "schema", inputDir.toString(), outputDir.toString()] + new SpecificCompilerTool().run(System.in, System.out, System.err, params) + } +} + +sourceSets.test.java { + srcDir tasks.generateAvroClasses +} + +test.dependsOn(tasks.generateAvroClasses) diff --git a/avro/avro-symbol-processor/src/main/kotlin/ru/tinkoff/kora/avro/symbol/processor/AvroSymbolProcessor.kt b/avro/avro-symbol-processor/src/main/kotlin/ru/tinkoff/kora/avro/symbol/processor/AvroSymbolProcessor.kt new file mode 100644 index 000000000..0ebf08362 --- /dev/null +++ b/avro/avro-symbol-processor/src/main/kotlin/ru/tinkoff/kora/avro/symbol/processor/AvroSymbolProcessor.kt @@ -0,0 +1,58 @@ +package ru.tinkoff.kora.avro.symbol.processor + +import com.google.devtools.ksp.processing.CodeGenerator +import com.google.devtools.ksp.processing.Resolver +import com.google.devtools.ksp.processing.SymbolProcessorEnvironment +import com.google.devtools.ksp.symbol.KSAnnotated +import com.google.devtools.ksp.symbol.KSClassDeclaration +import ru.tinkoff.kora.avro.symbol.processor.reader.AvroReaderGenerator +import ru.tinkoff.kora.avro.symbol.processor.writer.AvroWriterGenerator +import ru.tinkoff.kora.ksp.common.AnnotationUtils.isAnnotationPresent +import ru.tinkoff.kora.ksp.common.BaseSymbolProcessor +import ru.tinkoff.kora.ksp.common.exception.ProcessingErrorException + +class AvroSymbolProcessor( + environment: SymbolProcessorEnvironment +) : BaseSymbolProcessor(environment) { + private val processedReaders = HashSet() + private val processedWriters = HashSet() + private val codeGenerator: CodeGenerator = environment.codeGenerator + + private fun getSupportedAnnotationTypes() = setOf( + AvroTypes.avroBinary.canonicalName, + AvroTypes.avroJson.canonicalName, + ) + + override fun processRound(resolver: Resolver): List { + val symbolsToProcess = getSupportedAnnotationTypes() + .map { resolver.getSymbolsWithAnnotation(it).toList() } + .flatten() + .distinct() + + val readerGenerator = AvroReaderGenerator(resolver, codeGenerator) + val writerGenerator = AvroWriterGenerator(resolver, codeGenerator) + for (it in symbolsToProcess) { + try { + when (it) { + is KSClassDeclaration -> { + if (it.isAnnotationPresent(AvroTypes.avroBinary)) { + if (processedReaders.add(it.qualifiedName!!.asString())) { + readerGenerator.generateBinary(it) + writerGenerator.generateBinary(it) + } + } else if (it.isAnnotationPresent(AvroTypes.avroJson)) { + if (processedWriters.add(it.qualifiedName!!.asString())) { + readerGenerator.generateJson(it) + writerGenerator.generateJson(it) + } + } + } + } + } catch (e: ProcessingErrorException) { + e.printError(kspLogger) + } + } + return listOf() + } +} + diff --git a/avro/avro-symbol-processor/src/main/kotlin/ru/tinkoff/kora/avro/symbol/processor/AvroSymbolProcessorProvider.kt b/avro/avro-symbol-processor/src/main/kotlin/ru/tinkoff/kora/avro/symbol/processor/AvroSymbolProcessorProvider.kt new file mode 100644 index 000000000..ab3e7c465 --- /dev/null +++ b/avro/avro-symbol-processor/src/main/kotlin/ru/tinkoff/kora/avro/symbol/processor/AvroSymbolProcessorProvider.kt @@ -0,0 +1,10 @@ +package ru.tinkoff.kora.avro.symbol.processor + +import com.google.devtools.ksp.processing.SymbolProcessorEnvironment +import com.google.devtools.ksp.processing.SymbolProcessorProvider + +class AvroSymbolProcessorProvider : SymbolProcessorProvider { + override fun create(environment: SymbolProcessorEnvironment): AvroSymbolProcessor { + return AvroSymbolProcessor(environment) + } +} diff --git a/avro/avro-symbol-processor/src/main/kotlin/ru/tinkoff/kora/avro/symbol/processor/AvroTypes.kt b/avro/avro-symbol-processor/src/main/kotlin/ru/tinkoff/kora/avro/symbol/processor/AvroTypes.kt new file mode 100644 index 000000000..768a9e166 --- /dev/null +++ b/avro/avro-symbol-processor/src/main/kotlin/ru/tinkoff/kora/avro/symbol/processor/AvroTypes.kt @@ -0,0 +1,19 @@ +package ru.tinkoff.kora.avro.symbol.processor + +import com.squareup.kotlinpoet.ClassName + +object AvroTypes { + + val avroBinary = ClassName("ru.tinkoff.kora.avro.common.annotation", "AvroBinary") + val avroJson = ClassName("ru.tinkoff.kora.avro.common.annotation", "AvroJson") + + val reader = ClassName("ru.tinkoff.kora.avro.common", "AvroReader") + val writer = ClassName("ru.tinkoff.kora.avro.common", "AvroWriter") + + val schema = ClassName("org.apache.avro", "Schema") + val specificData = ClassName("org.apache.avro.specific", "SpecificData") + val datumReader = ClassName("org.apache.avro.specific", "SpecificDatumReader") + val datumWriter = ClassName("org.apache.avro.specific", "SpecificDatumWriter") + val decoderFactory = ClassName("org.apache.avro.io", "DecoderFactory") + val encoderFactory = ClassName("org.apache.avro.io", "EncoderFactory") +} diff --git a/avro/avro-symbol-processor/src/main/kotlin/ru/tinkoff/kora/avro/symbol/processor/JsonUtils.kt b/avro/avro-symbol-processor/src/main/kotlin/ru/tinkoff/kora/avro/symbol/processor/JsonUtils.kt new file mode 100644 index 000000000..e79d635e3 --- /dev/null +++ b/avro/avro-symbol-processor/src/main/kotlin/ru/tinkoff/kora/avro/symbol/processor/JsonUtils.kt @@ -0,0 +1,14 @@ +package ru.tinkoff.kora.avro.symbol.processor + +import com.google.devtools.ksp.symbol.KSClassDeclaration +import ru.tinkoff.kora.ksp.common.generatedClassName + +fun classPackage(classDeclaration: KSClassDeclaration) = classDeclaration.packageName.asString() + +fun KSClassDeclaration.readerBinaryName() = this.generatedClassName("AvroBinaryReader") + +fun KSClassDeclaration.readerJsonName() = this.generatedClassName("AvroJsonReader") + +fun KSClassDeclaration.writerBinaryName() = this.generatedClassName("AvroBinaryWriter") + +fun KSClassDeclaration.writerJsonName() = this.generatedClassName("AvroJsonWriter") diff --git a/avro/avro-symbol-processor/src/main/kotlin/ru/tinkoff/kora/avro/symbol/processor/extension/AvroExtensionFactory.kt b/avro/avro-symbol-processor/src/main/kotlin/ru/tinkoff/kora/avro/symbol/processor/extension/AvroExtensionFactory.kt new file mode 100644 index 000000000..93c51f8f6 --- /dev/null +++ b/avro/avro-symbol-processor/src/main/kotlin/ru/tinkoff/kora/avro/symbol/processor/extension/AvroExtensionFactory.kt @@ -0,0 +1,21 @@ +package ru.tinkoff.kora.avro.symbol.processor.extension + +import com.google.devtools.ksp.getClassDeclarationByName +import com.google.devtools.ksp.processing.CodeGenerator +import com.google.devtools.ksp.processing.KSPLogger +import com.google.devtools.ksp.processing.Resolver +import ru.tinkoff.kora.avro.symbol.processor.AvroTypes +import ru.tinkoff.kora.kora.app.ksp.extension.ExtensionFactory +import ru.tinkoff.kora.kora.app.ksp.extension.KoraExtension + +class AvroExtensionFactory : ExtensionFactory { + + override fun create(resolver: Resolver, kspLogger: KSPLogger, codeGenerator: CodeGenerator): KoraExtension? { + val avro = resolver.getClassDeclarationByName(AvroTypes.avroBinary.canonicalName) + return if (avro == null) { + null + } else { + AvroKoraExtension(resolver, kspLogger, codeGenerator) + } + } +} diff --git a/avro/avro-symbol-processor/src/main/kotlin/ru/tinkoff/kora/avro/symbol/processor/extension/AvroKoraExtension.kt b/avro/avro-symbol-processor/src/main/kotlin/ru/tinkoff/kora/avro/symbol/processor/extension/AvroKoraExtension.kt new file mode 100644 index 000000000..6f162e698 --- /dev/null +++ b/avro/avro-symbol-processor/src/main/kotlin/ru/tinkoff/kora/avro/symbol/processor/extension/AvroKoraExtension.kt @@ -0,0 +1,145 @@ +package ru.tinkoff.kora.avro.symbol.processor.extension + +import com.google.devtools.ksp.getClassDeclarationByName +import com.google.devtools.ksp.getConstructors +import com.google.devtools.ksp.processing.CodeGenerator +import com.google.devtools.ksp.processing.KSPLogger +import com.google.devtools.ksp.processing.Resolver +import com.google.devtools.ksp.symbol.KSClassDeclaration +import com.google.devtools.ksp.symbol.KSFunctionDeclaration +import com.google.devtools.ksp.symbol.KSType +import com.google.devtools.ksp.symbol.Modifier +import ru.tinkoff.kora.avro.symbol.processor.* +import ru.tinkoff.kora.avro.symbol.processor.reader.AvroReaderGenerator +import ru.tinkoff.kora.avro.symbol.processor.writer.AvroWriterGenerator +import ru.tinkoff.kora.kora.app.ksp.extension.ExtensionResult +import ru.tinkoff.kora.kora.app.ksp.extension.KoraExtension +import ru.tinkoff.kora.ksp.common.AnnotationUtils.isAnnotationPresent +import ru.tinkoff.kora.ksp.common.exception.ProcessingErrorException + +class AvroKoraExtension( + private val resolver: Resolver, + private val kspLogger: KSPLogger, + codeGenerator: CodeGenerator +) : KoraExtension { + private val writerErasure = resolver.getClassDeclarationByName(AvroTypes.writer.canonicalName)!!.asStarProjectedType() + private val readerErasure = resolver.getClassDeclarationByName(AvroTypes.reader.canonicalName)!!.asStarProjectedType() + private val readerGenerator = AvroReaderGenerator(resolver, codeGenerator) + private val writerGenerator = AvroWriterGenerator(resolver, codeGenerator) + + override fun getDependencyGenerator(resolver: Resolver, type: KSType, tags: Set): (() -> ExtensionResult)? { + val isBinary = tags.isEmpty() || isBinary(tags) + val isJson = isJson(tags) + if (!isBinary && !isJson) { + return null + } + + val actualType = type.makeNotNullable() + val erasure = actualType.starProjection() + if (erasure == writerErasure) { + val possibleClass = type.arguments[0].type!!.resolve() + val possibleClassDeclaration = possibleClass.declaration + if (possibleClassDeclaration !is KSClassDeclaration + || possibleClassDeclaration.modifiers.contains(Modifier.ENUM) + || possibleClassDeclaration.modifiers.contains(Modifier.SEALED) + ) { + return null + } + + if (isBinary && possibleClassDeclaration.isAnnotationPresent(AvroTypes.avroBinary)) { + return generatedByProcessor(resolver, possibleClassDeclaration, "AvroBinaryWriter") + } + if (isJson && possibleClassDeclaration.isAnnotationPresent(AvroTypes.avroJson)) { + return generatedByProcessor(resolver, possibleClassDeclaration, "AvroJsonWriter") + } + + try { + return { generateWriter(resolver, possibleClassDeclaration, isBinary, tags) } + } catch (e: ProcessingErrorException) { + e.message?.let { kspLogger.warn(it, null) } + return null + } + } + + if (erasure == readerErasure) { + val possibleClass = type.arguments[0].type!!.resolve() + val possibleClassDeclaration = possibleClass.declaration + if (possibleClassDeclaration !is KSClassDeclaration + || possibleClassDeclaration.modifiers.contains(Modifier.ENUM) + || possibleClassDeclaration.modifiers.contains(Modifier.SEALED) + ) { + return null + } + + if (isBinary && possibleClassDeclaration.isAnnotationPresent(AvroTypes.avroBinary)) { + return generatedByProcessor(resolver, possibleClassDeclaration, "AvroBinaryReader") + } + if (isJson && possibleClassDeclaration.isAnnotationPresent(AvroTypes.avroJson)) { + return generatedByProcessor(resolver, possibleClassDeclaration, "AvroJsonReader") + } + + try { + return { generateReader(resolver, possibleClassDeclaration, isBinary, tags) } + } catch (e: ProcessingErrorException) { + return null + } + } + return null + } + + private fun isBinary(tags: Set): Boolean = tags == setOf(AvroTypes.avroBinary.canonicalName) + + private fun isJson(tags: Set): Boolean = tags == setOf(AvroTypes.avroJson.canonicalName) + + private fun generateReader(resolver: Resolver, declaration: KSClassDeclaration, isBinary: Boolean, tags: Set): ExtensionResult { + val packageElement = declaration.packageName.asString() + val resultClassName = if (isBinary) declaration.readerBinaryName() else declaration.readerJsonName() + val resultDeclaration = resolver.getClassDeclarationByName("$packageElement.$resultClassName") + if (resultDeclaration != null) { + return ExtensionResult.fromConstructor(findDefaultConstructor(resultDeclaration), resultDeclaration, tags) + } + + if (declaration.isAnnotationPresent(AvroTypes.avroBinary)) { + // annotation processor will handle that + return ExtensionResult.RequiresCompilingResult + } + + if (isBinary) { + readerGenerator.generateBinary(declaration) + } else { + readerGenerator.generateJson(declaration) + } + return ExtensionResult.RequiresCompilingResult + } + + private fun generateWriter(resolver: Resolver, declaration: KSClassDeclaration, isBinary: Boolean, tags: Set): ExtensionResult { + val packageElement = declaration.packageName.asString() + val resultClassName = if (isBinary) declaration.writerBinaryName() else declaration.writerJsonName() + val resultDeclaration = resolver.getClassDeclarationByName("$packageElement.$resultClassName") + if (resultDeclaration != null) { + return ExtensionResult.fromConstructor(findDefaultConstructor(resultDeclaration), resultDeclaration, tags) + } + + if (declaration.isAnnotationPresent(AvroTypes.avroJson)) { + // annotation processor will handle that + return ExtensionResult.RequiresCompilingResult + } + + if (isBinary) { + writerGenerator.generateBinary(declaration) + } else { + writerGenerator.generateJson(declaration) + } + return ExtensionResult.RequiresCompilingResult + } + + private fun findDefaultConstructor(resultElement: KSClassDeclaration): KSFunctionDeclaration { + return if (resultElement.primaryConstructor != null) { + resultElement.primaryConstructor!! + } else if (resultElement.getConstructors().count() == 1) { + resultElement.getConstructors().first() + } else { + throw ProcessingErrorException("No primary constructor found for: $resultElement", resultElement) + } + } +} diff --git a/avro/avro-symbol-processor/src/main/kotlin/ru/tinkoff/kora/avro/symbol/processor/reader/AvroReaderGenerator.kt b/avro/avro-symbol-processor/src/main/kotlin/ru/tinkoff/kora/avro/symbol/processor/reader/AvroReaderGenerator.kt new file mode 100644 index 000000000..1372e0fc2 --- /dev/null +++ b/avro/avro-symbol-processor/src/main/kotlin/ru/tinkoff/kora/avro/symbol/processor/reader/AvroReaderGenerator.kt @@ -0,0 +1,135 @@ +package ru.tinkoff.kora.avro.symbol.processor.reader + +import com.google.devtools.ksp.processing.CodeGenerator +import com.google.devtools.ksp.processing.Resolver +import com.google.devtools.ksp.symbol.KSClassDeclaration +import com.squareup.kotlinpoet.* +import com.squareup.kotlinpoet.ParameterizedTypeName.Companion.parameterizedBy +import com.squareup.kotlinpoet.jvm.throws +import com.squareup.kotlinpoet.ksp.addOriginatingKSFile +import com.squareup.kotlinpoet.ksp.toTypeParameterResolver +import com.squareup.kotlinpoet.ksp.toTypeVariableName +import com.squareup.kotlinpoet.ksp.writeTo +import ru.tinkoff.kora.avro.symbol.processor.AvroTypes +import ru.tinkoff.kora.avro.symbol.processor.classPackage +import ru.tinkoff.kora.avro.symbol.processor.readerBinaryName +import ru.tinkoff.kora.avro.symbol.processor.readerJsonName +import ru.tinkoff.kora.ksp.common.KspCommonUtils.generated +import ru.tinkoff.kora.ksp.common.KspCommonUtils.toTypeName +import java.io.IOException +import java.io.InputStream + +class AvroReaderGenerator(val resolver: Resolver, private val codeGenerator: CodeGenerator) { + + fun generateBinary(declaration: KSClassDeclaration) { + val typeName = declaration.toTypeName() + val typeParameterResolver = declaration.typeParameters.toTypeParameterResolver() + val readerInterface = AvroTypes.reader.parameterizedBy(typeName) + val typeBuilder = TypeSpec.classBuilder(declaration.readerBinaryName()) + .generated(AvroReaderGenerator::class) + declaration.containingFile?.let { typeBuilder.addOriginatingKSFile(it) } + + typeBuilder.addSuperinterface(readerInterface) + + declaration.typeParameters.forEach { + typeBuilder.addTypeVariable(it.toTypeVariableName(typeParameterResolver)) + } + + typeBuilder.addProperty( + PropertySpec.builder("SCHEMA", AvroTypes.schema) + .addModifiers(KModifier.PRIVATE, KModifier.FINAL) + .initializer("%T.getClassSchema()", typeName) + .build() + ) + typeBuilder.addProperty( + PropertySpec.builder("SPECIFIC_DATA", AvroTypes.specificData) + .addModifiers(KModifier.PRIVATE, KModifier.FINAL) + .initializer("%T().getSpecificData()", typeName) + .build() + ) + typeBuilder.addProperty( + PropertySpec.builder("READER", AvroTypes.datumReader.parameterizedBy(typeName)) + .addModifiers(KModifier.PRIVATE, KModifier.FINAL) + .initializer("%T(SCHEMA, SCHEMA, SPECIFIC_DATA)", AvroTypes.datumReader) + .build() + ) + + val method = FunSpec.builder("read") + .addModifiers(KModifier.PUBLIC, KModifier.FINAL, KModifier.OVERRIDE) + .throws(IOException::class) + .addParameter("value", InputStream::class.asTypeName().copy(true)) + .returns(typeName.copy(true)) + method.beginControlFlow("if (value == null || value.available() == 0)") + method.addStatement("return null") + method.endControlFlow() + method.addStatement("val decoder = %T.get().directBinaryDecoder(value, null)", AvroTypes.decoderFactory) + method.addStatement("return READER.read(%T(), decoder)", typeName) + + typeBuilder.addFunction(method.build()) + val spec = typeBuilder.build() + + val packageElement = classPackage(declaration) + val fileSpec = FileSpec.builder( + packageName = packageElement, + fileName = spec.name!! + ) + fileSpec.addType(spec) + fileSpec.build().writeTo(codeGenerator = codeGenerator, aggregating = false) + } + + fun generateJson(declaration: KSClassDeclaration) { + val typeName = declaration.toTypeName() + val typeParameterResolver = declaration.typeParameters.toTypeParameterResolver() + val readerInterface = AvroTypes.reader.parameterizedBy(typeName) + val typeBuilder = TypeSpec.classBuilder(declaration.readerJsonName()) + .generated(AvroReaderGenerator::class) + declaration.containingFile?.let { typeBuilder.addOriginatingKSFile(it) } + + typeBuilder.addSuperinterface(readerInterface) + + declaration.typeParameters.forEach { + typeBuilder.addTypeVariable(it.toTypeVariableName(typeParameterResolver)) + } + + typeBuilder.addProperty( + PropertySpec.builder("SCHEMA", AvroTypes.schema) + .addModifiers(KModifier.PRIVATE, KModifier.FINAL) + .initializer("%T.getClassSchema()", typeName) + .build() + ) + typeBuilder.addProperty( + PropertySpec.builder("SPECIFIC_DATA", AvroTypes.specificData) + .addModifiers(KModifier.PRIVATE, KModifier.FINAL) + .initializer("%T().getSpecificData()", typeName) + .build() + ) + typeBuilder.addProperty( + PropertySpec.builder("READER", AvroTypes.datumReader.parameterizedBy(typeName)) + .addModifiers(KModifier.PRIVATE, KModifier.FINAL) + .initializer("%T(SCHEMA, SCHEMA, SPECIFIC_DATA)", AvroTypes.datumReader) + .build() + ) + + val method = FunSpec.builder("read") + .addModifiers(KModifier.PUBLIC, KModifier.FINAL, KModifier.OVERRIDE) + .throws(IOException::class) + .addParameter("value", InputStream::class.asTypeName().copy(true)) + .returns(typeName.copy(true)) + method.beginControlFlow("if (value == null || value.available() == 0)") + method.addStatement("return null") + method.endControlFlow() + method.addStatement("val decoder = %T.get().jsonDecoder(SCHEMA, value)", AvroTypes.decoderFactory) + method.addStatement("return READER.read(%T(), decoder)", typeName) + + typeBuilder.addFunction(method.build()) + val spec = typeBuilder.build() + + val packageElement = classPackage(declaration) + val fileSpec = FileSpec.builder( + packageName = packageElement, + fileName = spec.name!! + ) + fileSpec.addType(spec) + fileSpec.build().writeTo(codeGenerator = codeGenerator, aggregating = false) + } +} diff --git a/avro/avro-symbol-processor/src/main/kotlin/ru/tinkoff/kora/avro/symbol/processor/writer/AvroWriterGenerator.kt b/avro/avro-symbol-processor/src/main/kotlin/ru/tinkoff/kora/avro/symbol/processor/writer/AvroWriterGenerator.kt new file mode 100644 index 000000000..63fb4af96 --- /dev/null +++ b/avro/avro-symbol-processor/src/main/kotlin/ru/tinkoff/kora/avro/symbol/processor/writer/AvroWriterGenerator.kt @@ -0,0 +1,153 @@ +package ru.tinkoff.kora.avro.symbol.processor.writer + +import com.google.devtools.ksp.processing.CodeGenerator +import com.google.devtools.ksp.processing.Resolver +import com.google.devtools.ksp.symbol.KSClassDeclaration +import com.squareup.kotlinpoet.* +import com.squareup.kotlinpoet.ParameterizedTypeName.Companion.parameterizedBy +import com.squareup.kotlinpoet.jvm.throws +import com.squareup.kotlinpoet.ksp.addOriginatingKSFile +import com.squareup.kotlinpoet.ksp.toTypeParameterResolver +import com.squareup.kotlinpoet.ksp.toTypeVariableName +import com.squareup.kotlinpoet.ksp.writeTo +import ru.tinkoff.kora.avro.symbol.processor.AvroTypes +import ru.tinkoff.kora.avro.symbol.processor.classPackage +import ru.tinkoff.kora.avro.symbol.processor.writerBinaryName +import ru.tinkoff.kora.avro.symbol.processor.writerJsonName +import ru.tinkoff.kora.ksp.common.KspCommonUtils.generated +import ru.tinkoff.kora.ksp.common.KspCommonUtils.toTypeName +import java.io.ByteArrayOutputStream +import java.io.IOException + +class AvroWriterGenerator(val resolver: Resolver, val codeGenerator: CodeGenerator) { + + fun generateBinary(declaration: KSClassDeclaration) { + val typeName = declaration.toTypeName() + val typeParameterResolver = declaration.typeParameters.toTypeParameterResolver() + val writerInterface = AvroTypes.writer.parameterizedBy(typeName) + val typeBuilder = TypeSpec.classBuilder(declaration.writerBinaryName()) + .generated(AvroWriterGenerator::class) + declaration.containingFile?.let { typeBuilder.addOriginatingKSFile(it) } + typeBuilder.addSuperinterface(writerInterface) + + declaration.typeParameters.forEach { + typeBuilder.addTypeVariable(it.toTypeVariableName(typeParameterResolver)) + } + + typeBuilder.addProperty( + PropertySpec.builder("EMPTY", ByteArray::class) + .addModifiers(KModifier.PRIVATE, KModifier.FINAL) + .initializer("%T(0)", ByteArray::class) + .build() + ) + typeBuilder.addProperty( + PropertySpec.builder("SCHEMA", AvroTypes.schema) + .addModifiers(KModifier.PRIVATE, KModifier.FINAL) + .initializer("%T.getClassSchema()", typeName) + .build() + ) + typeBuilder.addProperty( + PropertySpec.builder("SPECIFIC_DATA", AvroTypes.specificData) + .addModifiers(KModifier.PRIVATE, KModifier.FINAL) + .initializer("%T().getSpecificData()", typeName) + .build() + ) + typeBuilder.addProperty( + PropertySpec.builder("WRITER", AvroTypes.datumWriter.parameterizedBy(typeName)) + .addModifiers(KModifier.PRIVATE, KModifier.FINAL) + .initializer("%T(SCHEMA, SPECIFIC_DATA)", AvroTypes.datumWriter) + .build() + ) + + val method = FunSpec.builder("writeBytes") + .addModifiers(KModifier.PUBLIC, KModifier.FINAL, KModifier.OVERRIDE) + .throws(IOException::class) + .addParameter("value", typeName.copy(true)) + .returns(ByteArray::class) + method.beginControlFlow("if (value == null)") + method.addStatement("return EMPTY") + method.endControlFlow() + method.beginControlFlow("return %T().%M", ByteArrayOutputStream::class, MemberName("kotlin.io", "use")) + method.addStatement("val encoder = %T.get().directBinaryEncoder(it, null)", AvroTypes.encoderFactory) + method.addStatement("WRITER.write(value, encoder)", typeName) + method.addStatement("encoder.flush()") + method.addStatement("it.toByteArray()") + method.endControlFlow() + + typeBuilder.addFunction(method.build()) + val spec = typeBuilder.build() + + val packageElement = classPackage(declaration) + val fileSpec = FileSpec.builder( + packageName = packageElement, + fileName = spec.name!! + ) + fileSpec.addType(spec) + fileSpec.build().writeTo(codeGenerator = codeGenerator, aggregating = false) + } + + fun generateJson(declaration: KSClassDeclaration) { + val typeName = declaration.toTypeName() + val typeParameterResolver = declaration.typeParameters.toTypeParameterResolver() + val writerInterface = AvroTypes.writer.parameterizedBy(typeName) + val typeBuilder = TypeSpec.classBuilder(declaration.writerJsonName()) + .generated(AvroWriterGenerator::class) + declaration.containingFile?.let { typeBuilder.addOriginatingKSFile(it) } + typeBuilder.addSuperinterface(writerInterface) + + declaration.typeParameters.forEach { + typeBuilder.addTypeVariable(it.toTypeVariableName(typeParameterResolver)) + } + + typeBuilder.addProperty( + PropertySpec.builder("EMPTY", ByteArray::class) + .addModifiers(KModifier.PRIVATE, KModifier.FINAL) + .initializer("%T(0)", ByteArray::class) + .build() + ) + typeBuilder.addProperty( + PropertySpec.builder("SCHEMA", AvroTypes.schema) + .addModifiers(KModifier.PRIVATE, KModifier.FINAL) + .initializer("%T.getClassSchema()", typeName) + .build() + ) + typeBuilder.addProperty( + PropertySpec.builder("SPECIFIC_DATA", AvroTypes.specificData) + .addModifiers(KModifier.PRIVATE, KModifier.FINAL) + .initializer("%T().getSpecificData()", typeName) + .build() + ) + typeBuilder.addProperty( + PropertySpec.builder("WRITER", AvroTypes.datumWriter.parameterizedBy(typeName)) + .addModifiers(KModifier.PRIVATE, KModifier.FINAL) + .initializer("%T(SCHEMA, SPECIFIC_DATA)", AvroTypes.datumWriter) + .build() + ) + + val method = FunSpec.builder("writeBytes") + .addModifiers(KModifier.PUBLIC, KModifier.FINAL, KModifier.OVERRIDE) + .throws(IOException::class) + .addParameter("value", typeName.copy(true)) + .returns(ByteArray::class) + method.beginControlFlow("if (value == null)") + method.addStatement("return EMPTY") + method.endControlFlow() + method.beginControlFlow("return %T().%M", ByteArrayOutputStream::class, MemberName("kotlin.io", "use")) + method.addStatement("val encoder = %T.get().jsonEncoder(SCHEMA, it)", AvroTypes.encoderFactory) + method.addStatement("WRITER.write(value, encoder)", typeName) + method.addStatement("encoder.flush()") + method.addStatement("it.toByteArray()") + method.endControlFlow() + + typeBuilder.addFunction(method.build()) + val spec = typeBuilder.build() + + val packageElement = classPackage(declaration) + val fileSpec = FileSpec.builder( + packageName = packageElement, + fileName = spec.name!! + ) + fileSpec.addType(spec) + fileSpec.build().writeTo(codeGenerator = codeGenerator, aggregating = false) + } +} diff --git a/avro/avro-symbol-processor/src/main/resources/META-INF/services/com.google.devtools.ksp.processing.SymbolProcessorProvider b/avro/avro-symbol-processor/src/main/resources/META-INF/services/com.google.devtools.ksp.processing.SymbolProcessorProvider new file mode 100644 index 000000000..ad4e56fb4 --- /dev/null +++ b/avro/avro-symbol-processor/src/main/resources/META-INF/services/com.google.devtools.ksp.processing.SymbolProcessorProvider @@ -0,0 +1 @@ +ru.tinkoff.kora.avro.symbol.processor.AvroSymbolProcessorProvider diff --git a/avro/avro-symbol-processor/src/main/resources/META-INF/services/ru.tinkoff.kora.kora.app.ksp.extension.ExtensionFactory b/avro/avro-symbol-processor/src/main/resources/META-INF/services/ru.tinkoff.kora.kora.app.ksp.extension.ExtensionFactory new file mode 100644 index 000000000..8e7ded45b --- /dev/null +++ b/avro/avro-symbol-processor/src/main/resources/META-INF/services/ru.tinkoff.kora.kora.app.ksp.extension.ExtensionFactory @@ -0,0 +1 @@ +ru.tinkoff.kora.avro.symbol.processor.extension.AvroExtensionFactory diff --git a/avro/avro-symbol-processor/src/test/kotlin/ru/tinkoff/kora/avro/symbol/processor/AbstractAvroSymbolProcessorTest.kt b/avro/avro-symbol-processor/src/test/kotlin/ru/tinkoff/kora/avro/symbol/processor/AbstractAvroSymbolProcessorTest.kt new file mode 100644 index 000000000..7b9eb66a8 --- /dev/null +++ b/avro/avro-symbol-processor/src/test/kotlin/ru/tinkoff/kora/avro/symbol/processor/AbstractAvroSymbolProcessorTest.kt @@ -0,0 +1,208 @@ +package ru.tinkoff.kora.avro.symbol.processor + +import org.apache.avro.generic.IndexedRecord +import org.apache.avro.io.DecoderFactory +import org.apache.avro.io.Encoder +import org.apache.avro.io.EncoderFactory +import org.apache.avro.specific.SpecificData +import org.apache.avro.specific.SpecificDatumReader +import org.apache.avro.specific.SpecificDatumWriter +import org.assertj.core.api.Assertions +import ru.tinkoff.kora.avro.common.AvroReader +import ru.tinkoff.kora.avro.common.AvroWriter +import ru.tinkoff.kora.ksp.common.AbstractSymbolProcessorTest +import java.io.* +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.time.Instant +import java.util.* + +abstract class AbstractAvroSymbolProcessorTest : AbstractSymbolProcessorTest() { + + override fun commonImports(): String { + return super.commonImports() + """ + import ru.tinkoff.kora.common.KoraApp; + import tinkoff.kora.binary.TestAvroBinary; + import tinkoff.kora.json.TestAvroJson; + import ru.tinkoff.kora.avro.common.annotation.*; + import ru.tinkoff.kora.avro.common.AvroReader; + import ru.tinkoff.kora.avro.common.AvroWriter; + """.trimIndent() + } + + protected fun getTestAvroBinaryGenerated(): IndexedRecord { + val testAvro = newGenerated("TestAvro").invoke() as IndexedRecord + testAvro.put(0, "cluster") + testAvro.put(1, Instant.EPOCH) + testAvro.put(2, "descr") + testAvro.put(3, 12345L) + testAvro.put(4, true) + return testAvro + } + + protected fun getTestAvroJsonGenerated(): IndexedRecord { + val testAvro = newGenerated("TestAvro").invoke() as IndexedRecord + testAvro.put(0, "cluster") + testAvro.put(1, Instant.EPOCH) + testAvro.put(2, "descr") + testAvro.put(3, 12345L) + testAvro.put(4, true) + return testAvro + } + + protected fun getTestAvroAsBytes(): ByteArray { + return Base64.getDecoder().decode("DmNsdXN0ZXICAAIKZGVzY3IC8sABAgE=") + } + + protected fun getTestAvroAsJson(): String { + return "" + } + + protected fun assertThatTestAvroValid(expected: IndexedRecord, actual: IndexedRecord) { + Assertions.assertThat(actual).isNotNull() + Assertions.assertThat(actual[0].toString()).isEqualTo(actual[0].toString()) + Assertions.assertThat(actual[1]).isEqualTo(expected[1]) + Assertions.assertThat(actual[2].toString()).isEqualTo(actual[2].toString()) + Assertions.assertThat(actual[3]).isEqualTo(expected[3]) + Assertions.assertThat(actual[4]).isEqualTo(expected[4]) + } + + protected fun getAvroJavaClass(): String { + try { + val strings = Files.lines(File("build/generated/sources/avro/tinkoff/kora/TestAvro.java").toPath()) + .map { s -> s.replace("tinkoff.kora.", "") } + .toList() + val avro = java.lang.String.join("\n", strings.subList(7, strings.size)) + return avro.replace("tinkoff.kora", testPackage()) + } catch (e: IOException) { + throw RuntimeException(e) + } + } + + protected open fun readerBinaryClass(forClass: String) = compileResult.classLoader.readerBinaryClass(testPackage(), forClass) + protected open fun writerBinaryClass(forClass: String) = compileResult.classLoader.writerBinaryClass(testPackage(), forClass) + protected open fun readerJsonClass(forClass: String) = compileResult.classLoader.readerJsonClass(testPackage(), forClass) + protected open fun writerJsonClass(forClass: String) = compileResult.classLoader.writerJsonClass(testPackage(), forClass) + + protected open fun readerBinary(forClass: String, vararg params: Any?) = compileResult.classLoader.readerBinary(testPackage(), forClass, *params) + protected open fun writerBinary(forClass: String, vararg params: Any?) = compileResult.classLoader.writerBinary(testPackage(), forClass, *params) + protected open fun readerJson(forClass: String, vararg params: Any?) = compileResult.classLoader.readerJson(testPackage(), forClass, *params) + protected open fun writerJson(forClass: String, vararg params: Any?) = compileResult.classLoader.writerJson(testPackage(), forClass, *params) + + protected open fun mapperClass(forClass: String) = compileResult.classLoader.mapper(testPackage(), forClass) + protected open fun mapper(forClass: String, readerParams: List<*>, writerParams: List<*>) = + compileResult.classLoader.mapper(testPackage(), forClass, readerParams, writerParams) + + class ReaderAndWriter(private val reader: AvroReader, private val writer: AvroWriter) : AvroReader, AvroWriter { + + override fun read(`is`: InputStream?): T = reader.read(`is`) + + override fun writeBytes(value: T): ByteArray = writer.writeBytes(value) + } + + companion object { + fun ClassLoader.mapper(packageName: String, forClass: String): ReaderAndWriter { + return mapper(packageName, forClass, listOf(), listOf()) + } + + fun ClassLoader.mapper(packageName: String, forClass: String, readerParams: List<*>, writerParams: List<*>): ReaderAndWriter { + val reader: AvroReader = readerBinary(packageName, forClass, *readerParams.toTypedArray()) + val writer: AvroWriter = writerBinary(packageName, forClass, *writerParams.toTypedArray()) + return ReaderAndWriter(reader, writer) + } + + fun ClassLoader.readerBinaryClass(packageName: String, forClass: String) = loadClass(packageName + ".$" + forClass + "_AvroBinaryReader")!! + + fun ClassLoader.readerJsonClass(packageName: String, forClass: String) = loadClass(packageName + ".$" + forClass + "_AvroJsonReader")!! + + fun ClassLoader.writerBinaryClass(packageName: String, forClass: String) = loadClass(packageName + ".$" + forClass + "_AvroBinaryWriter")!! + + fun ClassLoader.writerJsonClass(packageName: String, forClass: String) = loadClass(packageName + ".$" + forClass + "_AvroJsonWriter")!! + + fun ClassLoader.readerBinary(packageName: String, forClass: String, vararg params: Any?): AvroReader { + return readerBinaryClass(packageName, forClass) + .constructors[0] + .newInstance(*params) as AvroReader + } + + fun ClassLoader.writerBinary(packageName: String, forClass: String, vararg params: Any?): AvroWriter { + return writerBinaryClass(packageName, forClass) + .constructors[0] + .newInstance(*params) as AvroWriter + } + + fun ClassLoader.readerJson(packageName: String, forClass: String, vararg params: Any?): AvroReader { + return readerJsonClass(packageName, forClass) + .constructors[0] + .newInstance(*params) as AvroReader + } + + fun ClassLoader.writerJson(packageName: String, forClass: String, vararg params: Any?): AvroWriter { + return writerJsonClass(packageName, forClass) + .constructors[0] + .newInstance(*params) as AvroWriter + } + + fun ReaderAndWriter.assert(value: T, avro: String) { + this.assertWrite(value, avro) + this.assertRead(avro, value) + } + + fun AvroWriter.assertWrite(value: T, expectedAvro: String) { + Assertions.assertThat(this.writeBytes(value)).asString(StandardCharsets.UTF_8).isEqualTo(expectedAvro) + } + + fun AvroReader.assertRead(avro: String, expectedObject: T) { + Assertions.assertThat(this.read(avro.toByteArray())).isEqualTo(expectedObject) + } + } + + // json + protected fun writeAsJson(value: IndexedRecord): ByteArray { + try { + ByteArrayOutputStream().use { stream -> + val writer = SpecificDatumWriter(value.schema) + val jsonEncoder: Encoder = EncoderFactory.get().jsonEncoder(value.schema, stream) + writer.write(value, jsonEncoder) + jsonEncoder.flush() + return stream.toByteArray() + } + } catch (e: IOException) { + throw UncheckedIOException(e) + } + } + + // header + protected fun writeAsBinary(value: IndexedRecord): ByteArray { + try { + ByteArrayOutputStream().use { stream -> + val fieldData = value.javaClass.getDeclaredField("MODEL$") + fieldData.isAccessible = true + val data = fieldData[value] as SpecificData + + val writer = SpecificDatumWriter(value.schema, data) + val encoder: Encoder = EncoderFactory.get().directBinaryEncoder(stream, null) + writer.write(value, encoder) + encoder.flush() + return stream.toByteArray() + } + } catch (e: Exception) { + throw IllegalStateException(e) + } + } + + // fields + protected fun readAsBinary(value: ByteArray?): IndexedRecord { + try { + val fieldData = getTestAvroBinaryGenerated().javaClass.getDeclaredField("MODEL$") + fieldData.isAccessible = true + val data = fieldData[value] as SpecificData + + val reader = SpecificDatumReader(getTestAvroBinaryGenerated().schema, getTestAvroBinaryGenerated().schema, data) + val binaryDecoder = DecoderFactory.get().binaryDecoder(value, null) + return reader.read(null, binaryDecoder) as IndexedRecord + } catch (e: Exception) { + throw IllegalStateException(e) + } + } +} diff --git a/avro/avro-symbol-processor/src/test/kotlin/ru/tinkoff/kora/avro/symbol/processor/AvroBinaryTests.kt b/avro/avro-symbol-processor/src/test/kotlin/ru/tinkoff/kora/avro/symbol/processor/AvroBinaryTests.kt new file mode 100644 index 000000000..1cb4ce83d --- /dev/null +++ b/avro/avro-symbol-processor/src/test/kotlin/ru/tinkoff/kora/avro/symbol/processor/AvroBinaryTests.kt @@ -0,0 +1,43 @@ +package ru.tinkoff.kora.avro.symbol.processor + +import org.apache.avro.generic.IndexedRecord +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import ru.tinkoff.kora.avro.common.AvroReader +import ru.tinkoff.kora.avro.common.AvroWriter +import tinkoff.kora.binary.TestAvroBinary +import java.time.Instant + +class AvroBinaryTests : AbstractAvroSymbolProcessorTest() { + + @Test + fun testReaderAndWriterFromExtension() { + compile0( + """ + @KoraApp + interface TestApp { + @Root + fun root(@AvroBinary r: AvroReader, @AvroBinary w: AvroWriter) = "" + } + + """.trimIndent() + ) + + compileResult.assertSuccess() + val reader: AvroReader = compileResult.classLoader.readerBinary("tinkoff.kora.binary", "TestAvroBinary") + val writer: AvroWriter = compileResult.classLoader.writerBinary("tinkoff.kora.binary", "TestAvroBinary") + assertThat(reader).isNotNull() + assertThat(writer).isNotNull() + + val testAvro: IndexedRecord = TestAvroBinary.newBuilder() + .setCluster("cluster") + .setDate(Instant.EPOCH) + .setDescription("descr") + .setCounter(12345L) + .setFlag(true) + .build() + val write = writer.writeBytes(testAvro) + val read = reader.readUnchecked(write) + assertThatTestAvroValid(testAvro, read) + } +} diff --git a/avro/avro-symbol-processor/src/test/kotlin/ru/tinkoff/kora/avro/symbol/processor/AvroJsonTests.kt b/avro/avro-symbol-processor/src/test/kotlin/ru/tinkoff/kora/avro/symbol/processor/AvroJsonTests.kt new file mode 100644 index 000000000..674923d86 --- /dev/null +++ b/avro/avro-symbol-processor/src/test/kotlin/ru/tinkoff/kora/avro/symbol/processor/AvroJsonTests.kt @@ -0,0 +1,43 @@ +package ru.tinkoff.kora.avro.symbol.processor + +import org.apache.avro.generic.IndexedRecord +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import ru.tinkoff.kora.avro.common.AvroReader +import ru.tinkoff.kora.avro.common.AvroWriter +import tinkoff.kora.json.TestAvroJson +import java.time.Instant + +class AvroJsonTests : AbstractAvroSymbolProcessorTest() { + + @Test + fun testReaderAndWriterFromExtension() { + compile0( + """ + @KoraApp + interface TestApp { + @Root + fun root(@AvroJson r: AvroReader, @AvroJson w: AvroWriter) = "" + } + + """.trimIndent() + ) + + compileResult.assertSuccess() + val reader: AvroReader = compileResult.classLoader.readerJson("tinkoff.kora.json", "TestAvroJson") + val writer: AvroWriter = compileResult.classLoader.writerJson("tinkoff.kora.json", "TestAvroJson") + assertThat(reader).isNotNull() + assertThat(writer).isNotNull() + + val testAvro: IndexedRecord = TestAvroJson.newBuilder() + .setCluster("cluster") + .setDate(Instant.EPOCH) + .setDescription("descr") + .setCounter(12345L) + .setFlag(true) + .build() + val write = writer.writeBytes(testAvro) + val read = reader.readUnchecked(write) + assertThatTestAvroValid(testAvro, read) + } +} diff --git a/avro/avro-symbol-processor/src/test/resources/avro/TestAvroBinary.avsc b/avro/avro-symbol-processor/src/test/resources/avro/TestAvroBinary.avsc new file mode 100644 index 000000000..b287c8191 --- /dev/null +++ b/avro/avro-symbol-processor/src/test/resources/avro/TestAvroBinary.avsc @@ -0,0 +1,43 @@ +{ + "type": "record", + "name": "TestAvroBinary", + "namespace": "tinkoff.kora.binary", + "fields": [ + { + "name": "cluster", + "type": "string" + }, + { + "name": "date", + "type": [ + "null", + { + "type": "long", + "logicalType": "timestamp-millis" + } + ] + }, + { + "name": "description", + "type": [ + "null", + "string" + ] + }, + { + "name": "counter", + "type": [ + "null", + "long" + ] + }, + { + "name": "flag", + "type": [ + "null", + "boolean" + ], + "default": null + } + ] +} diff --git a/avro/avro-symbol-processor/src/test/resources/avro/TestAvroJson.avsc b/avro/avro-symbol-processor/src/test/resources/avro/TestAvroJson.avsc new file mode 100644 index 000000000..4483b847e --- /dev/null +++ b/avro/avro-symbol-processor/src/test/resources/avro/TestAvroJson.avsc @@ -0,0 +1,43 @@ +{ + "type": "record", + "name": "TestAvroJson", + "namespace": "tinkoff.kora.json", + "fields": [ + { + "name": "cluster", + "type": "string" + }, + { + "name": "date", + "type": [ + "null", + { + "type": "long", + "logicalType": "timestamp-millis" + } + ] + }, + { + "name": "description", + "type": [ + "null", + "string" + ] + }, + { + "name": "counter", + "type": [ + "null", + "long" + ] + }, + { + "name": "flag", + "type": [ + "null", + "boolean" + ], + "default": null + } + ] +} diff --git a/build.gradle b/build.gradle index d7f16c52f..46eb10eea 100644 --- a/build.gradle +++ b/build.gradle @@ -31,6 +31,13 @@ allprojects { version = System.getenv().getOrDefault("KORA_VERSION", "1.1.0-SNAPSHOT") repositories { mavenCentral() + maven { url = "https://packages.confluent.io/maven/" } + } + + buildscript { + repositories { + mavenCentral() + } } } diff --git a/dependencies.gradle b/dependencies.gradle index 3c8062b45..3934e14ad 100644 --- a/dependencies.gradle +++ b/dependencies.gradle @@ -133,6 +133,7 @@ dependencyResolutionManagement { DependencyResolutionManagement it -> library("mockserver-client", "org.mock-server", "mockserver-client-java").versionRef("mockserver") library("kafka-client", "org.apache.kafka", "kafka-clients").versionRef("kafka") + library("avro", "org.apache.avro", "avro").version("1.12.0") library("grpc-kotlin-stub", "io.grpc", "grpc-kotlin-stub").versionRef("grpc-kotlin") library("grpc-kotlin-gen", "io.grpc", "protoc-gen-grpc-kotlin").versionRef("grpc-kotlin") diff --git a/kafka/kafka/build.gradle b/kafka/kafka/build.gradle index 04f49ffd6..10b4c5736 100644 --- a/kafka/kafka/build.gradle +++ b/kafka/kafka/build.gradle @@ -1,11 +1,12 @@ apply from: "${project.rootDir}/gradle/kotlin-plugin.gradle" dependencies { + annotationProcessor project(":config:config-annotation-processor") + api project(":common") api project(":config:config-common") api project(":telemetry:telemetry-common") api libs.kafka.client - annotationProcessor project(":config:config-annotation-processor") testImplementation project(':internal:test-kafka') } diff --git a/kora-app-annotation-processor/src/main/java/ru/tinkoff/kora/kora/app/annotation/processor/declaration/ComponentDeclaration.java b/kora-app-annotation-processor/src/main/java/ru/tinkoff/kora/kora/app/annotation/processor/declaration/ComponentDeclaration.java index 323c069b3..bfb63f96e 100644 --- a/kora-app-annotation-processor/src/main/java/ru/tinkoff/kora/kora/app/annotation/processor/declaration/ComponentDeclaration.java +++ b/kora-app-annotation-processor/src/main/java/ru/tinkoff/kora/kora/app/annotation/processor/declaration/ComponentDeclaration.java @@ -210,6 +210,9 @@ static ComponentDeclaration fromExtension(ProcessingContext ctx, ExtensionResult if (tag.isEmpty()) { tag = TagUtils.parseTagValue(typeElement); } + if(tag.isEmpty()) { + tag = generatedResult.tags(); + } var type = typeElement.asType(); if (TypeParameterUtils.hasRawTypes(type)) { throw new ProcessingErrorException("Components with raw types can break dependency resolution in unpredictable way so they are forbidden", sourceMethod); diff --git a/kora-app-annotation-processor/src/main/java/ru/tinkoff/kora/kora/app/annotation/processor/extension/ExtensionResult.java b/kora-app-annotation-processor/src/main/java/ru/tinkoff/kora/kora/app/annotation/processor/extension/ExtensionResult.java index 81ee1534d..16e5085d5 100644 --- a/kora-app-annotation-processor/src/main/java/ru/tinkoff/kora/kora/app/annotation/processor/extension/ExtensionResult.java +++ b/kora-app-annotation-processor/src/main/java/ru/tinkoff/kora/kora/app/annotation/processor/extension/ExtensionResult.java @@ -13,18 +13,26 @@ public sealed interface ExtensionResult { static GeneratedResult fromExecutable(ExecutableElement constructor) { - return new ExtensionResult.GeneratedResult(constructor, (ExecutableType) constructor.asType()); + return new ExtensionResult.GeneratedResult(constructor, (ExecutableType) constructor.asType(), Set.of()); + } + + static GeneratedResult fromExecutable(ExecutableElement constructor, Set tags) { + return new ExtensionResult.GeneratedResult(constructor, (ExecutableType) constructor.asType(), tags); } static GeneratedResult fromExecutable(ExecutableElement executableElement, ExecutableType executableType) { - return new ExtensionResult.GeneratedResult(executableElement, executableType); + return new ExtensionResult.GeneratedResult(executableElement, executableType, Set.of()); + } + + static GeneratedResult fromExecutable(ExecutableElement executableElement, ExecutableType executableType, Set tags) { + return new ExtensionResult.GeneratedResult(executableElement, executableType, tags); } static ExtensionResult nextRound() { return RequiresCompilingResult.INSTANCE; } - record GeneratedResult(ExecutableElement sourceElement, ExecutableType targetType) implements ExtensionResult {} + record GeneratedResult(ExecutableElement sourceElement, ExecutableType targetType, Set tags) implements ExtensionResult {} record CodeBlockResult(Element source, Function codeBlock, TypeMirror componentType, Set componentTag, List dependencyTypes, List> dependencyTags) implements ExtensionResult { diff --git a/kora-app-symbol-processor/src/main/kotlin/ru/tinkoff/kora/kora/app/ksp/declaration/ComponentDeclaration.kt b/kora-app-symbol-processor/src/main/kotlin/ru/tinkoff/kora/kora/app/ksp/declaration/ComponentDeclaration.kt index b7f030e31..c922ca00b 100644 --- a/kora-app-symbol-processor/src/main/kotlin/ru/tinkoff/kora/kora/app/ksp/declaration/ComponentDeclaration.kt +++ b/kora-app-symbol-processor/src/main/kotlin/ru/tinkoff/kora/kora/app/ksp/declaration/ComponentDeclaration.kt @@ -177,7 +177,10 @@ sealed interface ComponentDeclaration { if (type.isError) { throw ProcessingErrorException("Component type is not resolvable in the current round of processing", sourceMethod) } - val tag = if (sourceMethod.isConstructor()) { + + val tag = if(extensionResult.tags.isNotEmpty()) + extensionResult.tags + else if (sourceMethod.isConstructor()) { sourceMethod.closestClassDeclaration()!!.parseTags() } else { sourceMethod.parseTags() diff --git a/kora-app-symbol-processor/src/main/kotlin/ru/tinkoff/kora/kora/app/ksp/extension/ExtensionResult.kt b/kora-app-symbol-processor/src/main/kotlin/ru/tinkoff/kora/kora/app/ksp/extension/ExtensionResult.kt index ce304214f..c8233e2af 100644 --- a/kora-app-symbol-processor/src/main/kotlin/ru/tinkoff/kora/kora/app/ksp/extension/ExtensionResult.kt +++ b/kora-app-symbol-processor/src/main/kotlin/ru/tinkoff/kora/kora/app/ksp/extension/ExtensionResult.kt @@ -4,7 +4,12 @@ import com.google.devtools.ksp.symbol.* import com.squareup.kotlinpoet.CodeBlock sealed interface ExtensionResult { - class GeneratedResult(val constructor: KSFunctionDeclaration, val type: KSFunction) : ExtensionResult + + class GeneratedResult( + val constructor: KSFunctionDeclaration, + val type: KSFunction, + val tags: Set + ) : ExtensionResult class CodeBlockResult( val source: KSDeclaration, @@ -12,26 +17,27 @@ sealed interface ExtensionResult { val componentType: KSType, val componentTag: Set, val dependencyTypes: List, - val dependencyTags: List>) : ExtensionResult { - } + val dependencyTags: List> + ) : ExtensionResult object RequiresCompilingResult : ExtensionResult companion object { fun fromConstructor(constructor: KSFunctionDeclaration, type: KSClassDeclaration): ExtensionResult { - return GeneratedResult( - constructor, - constructor.asMemberOf(type.asType(listOf())) - ) + return GeneratedResult(constructor, constructor.asMemberOf(type.asType(listOf())), setOf()) + } + + fun fromConstructor(constructor: KSFunctionDeclaration, type: KSClassDeclaration, tags: Set): ExtensionResult { + return GeneratedResult(constructor, constructor.asMemberOf(type.asType(listOf())), tags) } fun fromExecutable(constructor: KSFunctionDeclaration, type: KSFunction): ExtensionResult { - return GeneratedResult( - constructor, - type - ) + return GeneratedResult(constructor, type, setOf()) } - } + fun fromExecutable(constructor: KSFunctionDeclaration, type: KSFunction, tags: Set): ExtensionResult { + return GeneratedResult(constructor, type, tags) + } + } } diff --git a/kora-app-symbol-processor/src/main/kotlin/ru/tinkoff/kora/kora/app/ksp/extension/KoraExtension.kt b/kora-app-symbol-processor/src/main/kotlin/ru/tinkoff/kora/kora/app/ksp/extension/KoraExtension.kt index 538a03264..d24b28124 100644 --- a/kora-app-symbol-processor/src/main/kotlin/ru/tinkoff/kora/kora/app/ksp/extension/KoraExtension.kt +++ b/kora-app-symbol-processor/src/main/kotlin/ru/tinkoff/kora/kora/app/ksp/extension/KoraExtension.kt @@ -10,6 +10,7 @@ import ru.tinkoff.kora.ksp.common.exception.ProcessingErrorException import ru.tinkoff.kora.ksp.common.generatedClassName interface KoraExtension { + fun getDependencyGenerator(resolver: Resolver, type: KSType, tags: Set): (() -> ExtensionResult)? fun generatedByProcessor(resolver: Resolver, source: KSClassDeclaration, postfix: String): (() -> ExtensionResult)? { diff --git a/settings.gradle b/settings.gradle index 45c839258..1407d44e1 100644 --- a/settings.gradle +++ b/settings.gradle @@ -111,6 +111,10 @@ include( 'experimental:s3-client-aws', 'experimental:camunda-engine-bpmn', 'experimental:camunda-rest-undertow', + 'avro:avro-common', + 'avro:avro-module', + 'avro:avro-annotation-processor', + 'avro:avro-symbol-processor', ) apply from: 'dependencies.gradle'