From 40983a31795a2270556ffd993c808859cb83be65 Mon Sep 17 00:00:00 2001 From: Dian Qi Date: Fri, 18 Nov 2022 12:12:38 +0800 Subject: [PATCH] to #103, Support PB format serializer in Kafka Source --- .../DeserializationSchemaFactory.java | 9 + .../PbKafkaDeserializationSchema.java | 103 ++ .../legacy/kafka/common/KafkaPbMessage.java | 970 ++++++++++++++++++ .../kafka/source/KafkaSourceITCase.java | 80 +- .../src/test/resources/kafka.fds | 7 + .../src/test/resources/kafka.proto | 7 + .../resources/kafka_to_print_pb_format.json | 52 + 7 files changed, 1219 insertions(+), 9 deletions(-) create mode 100644 bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/main/java/com/bytedance/bitsail/connector/legacy/kafka/deserialization/PbKafkaDeserializationSchema.java create mode 100644 bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/java/com/bytedance/bitsail/connector/legacy/kafka/common/KafkaPbMessage.java create mode 100644 bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka.fds create mode 100644 bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka.proto create mode 100644 bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka_to_print_pb_format.json diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/main/java/com/bytedance/bitsail/connector/legacy/kafka/deserialization/DeserializationSchemaFactory.java b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/main/java/com/bytedance/bitsail/connector/legacy/kafka/deserialization/DeserializationSchemaFactory.java index 6eb34dbd6..95a07a05c 100644 --- a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/main/java/com/bytedance/bitsail/connector/legacy/kafka/deserialization/DeserializationSchemaFactory.java +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/main/java/com/bytedance/bitsail/connector/legacy/kafka/deserialization/DeserializationSchemaFactory.java @@ -40,6 +40,7 @@ public class DeserializationSchemaFactory { private static final String STREAMING_FILE_DESERIALIZATION_SCHEMA_KEY = "streaming_file"; private static final String JSON_DESERIALIZATION_SCHEMA_KEY = "json"; + private static final String PB_DESERIALIZATION_SCHEMA_KEY = "protobuf"; public static KafkaDeserializationSchema getDeserializationSchema(BitSailConfiguration configuration) { String formatType = configuration.get(BaseMessageQueueReaderOptions.FORMAT_TYPE); @@ -64,6 +65,14 @@ public static KafkaDeserializationSchema getDeserializationSchema(BitSailCo .build()); } + if (StringUtils.equalsIgnoreCase(PB_DESERIALIZATION_SCHEMA_KEY, formatType)) { + try { + return new PbKafkaDeserializationSchema(configuration); + } catch (Exception e) { + throw new IllegalArgumentException("Pb parser encountered error during initialization.", e); + } + } + throw new IllegalArgumentException(String.format("Unsupported %s format type.", formatType)); } } diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/main/java/com/bytedance/bitsail/connector/legacy/kafka/deserialization/PbKafkaDeserializationSchema.java b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/main/java/com/bytedance/bitsail/connector/legacy/kafka/deserialization/PbKafkaDeserializationSchema.java new file mode 100644 index 000000000..704364d24 --- /dev/null +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/main/java/com/bytedance/bitsail/connector/legacy/kafka/deserialization/PbKafkaDeserializationSchema.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.legacy.kafka.deserialization; + +import com.bytedance.bitsail.batch.file.parser.PbBytesParser; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.flink.core.typeinfo.PrimitiveColumnTypeInfo; + +import com.google.protobuf.Descriptors; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; +import org.apache.flink.types.Row; +import org.apache.flink.util.Collector; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.util.List; + +@Internal +public class PbKafkaDeserializationSchema implements KafkaDeserializationSchema { + private static final long serialVersionUID = -2556547991095476394L; + private final PbBytesParser parser; + private final RowTypeInfo rowTypeInfo; + private final int arity; + + public PbKafkaDeserializationSchema(BitSailConfiguration jobConf) throws Exception { + this.parser = new PbBytesParser(jobConf); + + List fields = parser.getDescriptor().getFields(); + this.arity = fields.size(); + PrimitiveColumnTypeInfo[] types = new PrimitiveColumnTypeInfo[arity]; + String[] fieldNames = new String[arity]; + for (int i = 0; i < arity; i++) { + Descriptors.FieldDescriptor field = fields.get(i); + types[i] = getColumnTypeInfo(field); + fieldNames[i] = field.getJsonName(); + } + this.rowTypeInfo = new RowTypeInfo(types, fieldNames); + } + + private PrimitiveColumnTypeInfo getColumnTypeInfo(Descriptors.FieldDescriptor field) { + switch (field.getJavaType()) { + case INT: + case LONG: + return PrimitiveColumnTypeInfo.LONG_COLUMN_TYPE_INFO; + case FLOAT: + case DOUBLE: + return PrimitiveColumnTypeInfo.DOUBLE_COLUMN_TYPE_INFO; + case BOOLEAN: + return PrimitiveColumnTypeInfo.BOOL_COLUMN_TYPE_INFO; + case BYTE_STRING: + return PrimitiveColumnTypeInfo.BYTES_COLUMN_TYPE_INFO; + case MESSAGE: + case STRING: + case ENUM: + default: + return PrimitiveColumnTypeInfo.STRING_COLUMN_TYPE_INFO; + } + } + + @Override + public void open(DeserializationSchema.InitializationContext context) throws Exception { + KafkaDeserializationSchema.super.open(context); + } + + @Override + public boolean isEndOfStream(Row row) { + return false; + } + + @Override + public Row deserialize(ConsumerRecord record) throws Exception { + byte[] value = record.value(); + return this.parser.parse(new Row(arity), value, 0, value.length, null, rowTypeInfo); + } + + @Override + public void deserialize(ConsumerRecord message, Collector out) throws Exception { + out.collect(deserialize(message)); + } + + @Override + public TypeInformation getProducedType() { + return rowTypeInfo; + } +} diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/java/com/bytedance/bitsail/connector/legacy/kafka/common/KafkaPbMessage.java b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/java/com/bytedance/bitsail/connector/legacy/kafka/common/KafkaPbMessage.java new file mode 100644 index 000000000..aa5b37fa6 --- /dev/null +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/java/com/bytedance/bitsail/connector/legacy/kafka/common/KafkaPbMessage.java @@ -0,0 +1,970 @@ + +package com.bytedance.bitsail.connector.legacy.kafka.common; // Generated by the protocol buffer compiler. DO NOT EDIT! +@SuppressWarnings({ + "checkstyle:MagicNumber", + "checkstyle:IllegalTokenText", + "checkstyle:EmptyLineSeparator", + "checkstyle:OverloadMethodsDeclarationOrder", + "checkstyle:WhitespaceAfter", + "checkstyle:NoWhitespaceBefore", + "checkstyle:ConstantName", + "checkstyle:LocalVariableName", + "checkstyle:EmptyBlock", + "checkstyle:OperatorWrap", + "checkstyle:LeftCurly", + "checkstyle:NeedBraces", + "checkstyle:Indentation", + "checkstyle:MemberName", + "checkstyle:AnnotationLocation" +}) +public final class KafkaPbMessage { + private KafkaPbMessage() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistryLite registry) { + } + + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + registerAllExtensions( + (com.google.protobuf.ExtensionRegistryLite) registry); + } + public interface ProtoTestOrBuilder extends + // @@protoc_insertion_point(interface_extends:ProtoTest) + com.google.protobuf.MessageOrBuilder { + + /** + * required int64 ID = 1; + * @return Whether the iD field is set. + */ + boolean hasID(); + /** + * required int64 ID = 1; + * @return The iD. + */ + long getID(); + + /** + * required string NAME = 2; + * @return Whether the nAME field is set. + */ + boolean hasNAME(); + /** + * required string NAME = 2; + * @return The nAME. + */ + java.lang.String getNAME(); + /** + * required string NAME = 2; + * @return The bytes for nAME. + */ + com.google.protobuf.ByteString + getNAMEBytes(); + + /** + * required string DATE = 3; + * @return Whether the dATE field is set. + */ + boolean hasDATE(); + /** + * required string DATE = 3; + * @return The dATE. + */ + java.lang.String getDATE(); + /** + * required string DATE = 3; + * @return The bytes for dATE. + */ + com.google.protobuf.ByteString + getDATEBytes(); + } + /** + * Protobuf type {@code ProtoTest} + */ + public static final class ProtoTest extends + com.google.protobuf.GeneratedMessageV3 implements + // @@protoc_insertion_point(message_implements:ProtoTest) + ProtoTestOrBuilder { + private static final long serialVersionUID = 0L; + // Use ProtoTest.newBuilder() to construct. + private ProtoTest(com.google.protobuf.GeneratedMessageV3.Builder builder) { + super(builder); + } + private ProtoTest() { + nAME_ = ""; + dATE_ = ""; + } + + @java.lang.Override + @SuppressWarnings({"unused"}) + protected java.lang.Object newInstance( + UnusedPrivateParameter unused) { + return new ProtoTest(); + } + + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private ProtoTest( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + this(); + if (extensionRegistry == null) { + throw new java.lang.NullPointerException(); + } + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + case 8: { + bitField0_ |= 0x00000001; + iD_ = input.readInt64(); + break; + } + case 18: { + com.google.protobuf.ByteString bs = input.readBytes(); + bitField0_ |= 0x00000002; + nAME_ = bs; + break; + } + case 26: { + com.google.protobuf.ByteString bs = input.readBytes(); + bitField0_ |= 0x00000004; + dATE_ = bs; + break; + } + default: { + if (!parseUnknownField( + input, unknownFields, extensionRegistry, tag)) { + done = true; + } + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return KafkaPbMessage.internal_static_ProtoTest_descriptor; + } + + @java.lang.Override + protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable + internalGetFieldAccessorTable() { + return KafkaPbMessage.internal_static_ProtoTest_fieldAccessorTable + .ensureFieldAccessorsInitialized( + KafkaPbMessage.ProtoTest.class, KafkaPbMessage.ProtoTest.Builder.class); + } + + private int bitField0_; + public static final int ID_FIELD_NUMBER = 1; + private long iD_; + /** + * required int64 ID = 1; + * @return Whether the iD field is set. + */ + @java.lang.Override + public boolean hasID() { + return ((bitField0_ & 0x00000001) != 0); + } + /** + * required int64 ID = 1; + * @return The iD. + */ + @java.lang.Override + public long getID() { + return iD_; + } + + public static final int NAME_FIELD_NUMBER = 2; + private volatile java.lang.Object nAME_; + /** + * required string NAME = 2; + * @return Whether the nAME field is set. + */ + @java.lang.Override + public boolean hasNAME() { + return ((bitField0_ & 0x00000002) != 0); + } + /** + * required string NAME = 2; + * @return The nAME. + */ + @java.lang.Override + public java.lang.String getNAME() { + java.lang.Object ref = nAME_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + nAME_ = s; + } + return s; + } + } + /** + * required string NAME = 2; + * @return The bytes for nAME. + */ + @java.lang.Override + public com.google.protobuf.ByteString + getNAMEBytes() { + java.lang.Object ref = nAME_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + nAME_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + public static final int DATE_FIELD_NUMBER = 3; + private volatile java.lang.Object dATE_; + /** + * required string DATE = 3; + * @return Whether the dATE field is set. + */ + @java.lang.Override + public boolean hasDATE() { + return ((bitField0_ & 0x00000004) != 0); + } + /** + * required string DATE = 3; + * @return The dATE. + */ + @java.lang.Override + public java.lang.String getDATE() { + java.lang.Object ref = dATE_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + dATE_ = s; + } + return s; + } + } + /** + * required string DATE = 3; + * @return The bytes for dATE. + */ + @java.lang.Override + public com.google.protobuf.ByteString + getDATEBytes() { + java.lang.Object ref = dATE_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + dATE_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + private byte memoizedIsInitialized = -1; + @java.lang.Override + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized == 1) return true; + if (isInitialized == 0) return false; + + if (!hasID()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasNAME()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasDATE()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + @java.lang.Override + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + if (((bitField0_ & 0x00000001) != 0)) { + output.writeInt64(1, iD_); + } + if (((bitField0_ & 0x00000002) != 0)) { + com.google.protobuf.GeneratedMessageV3.writeString(output, 2, nAME_); + } + if (((bitField0_ & 0x00000004) != 0)) { + com.google.protobuf.GeneratedMessageV3.writeString(output, 3, dATE_); + } + unknownFields.writeTo(output); + } + + @java.lang.Override + public int getSerializedSize() { + int size = memoizedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) != 0)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(1, iD_); + } + if (((bitField0_ & 0x00000002) != 0)) { + size += com.google.protobuf.GeneratedMessageV3.computeStringSize(2, nAME_); + } + if (((bitField0_ & 0x00000004) != 0)) { + size += com.google.protobuf.GeneratedMessageV3.computeStringSize(3, dATE_); + } + size += unknownFields.getSerializedSize(); + memoizedSize = size; + return size; + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof KafkaPbMessage.ProtoTest)) { + return super.equals(obj); + } + KafkaPbMessage.ProtoTest other = (KafkaPbMessage.ProtoTest) obj; + + if (hasID() != other.hasID()) return false; + if (hasID()) { + if (getID() + != other.getID()) return false; + } + if (hasNAME() != other.hasNAME()) return false; + if (hasNAME()) { + if (!getNAME() + .equals(other.getNAME())) return false; + } + if (hasDATE() != other.hasDATE()) return false; + if (hasDATE()) { + if (!getDATE() + .equals(other.getDATE())) return false; + } + if (!unknownFields.equals(other.unknownFields)) return false; + return true; + } + + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptor().hashCode(); + if (hasID()) { + hash = (37 * hash) + ID_FIELD_NUMBER; + hash = (53 * hash) + com.google.protobuf.Internal.hashLong( + getID()); + } + if (hasNAME()) { + hash = (37 * hash) + NAME_FIELD_NUMBER; + hash = (53 * hash) + getNAME().hashCode(); + } + if (hasDATE()) { + hash = (37 * hash) + DATE_FIELD_NUMBER; + hash = (53 * hash) + getDATE().hashCode(); + } + hash = (29 * hash) + unknownFields.hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static KafkaPbMessage.ProtoTest parseFrom( + java.nio.ByteBuffer data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static KafkaPbMessage.ProtoTest parseFrom( + java.nio.ByteBuffer data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static KafkaPbMessage.ProtoTest parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static KafkaPbMessage.ProtoTest parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static KafkaPbMessage.ProtoTest parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static KafkaPbMessage.ProtoTest parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static KafkaPbMessage.ProtoTest parseFrom(java.io.InputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input); + } + public static KafkaPbMessage.ProtoTest parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input, extensionRegistry); + } + public static KafkaPbMessage.ProtoTest parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseDelimitedWithIOException(PARSER, input); + } + public static KafkaPbMessage.ProtoTest parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseDelimitedWithIOException(PARSER, input, extensionRegistry); + } + public static KafkaPbMessage.ProtoTest parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input); + } + public static KafkaPbMessage.ProtoTest parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input, extensionRegistry); + } + + @java.lang.Override + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder() { + return DEFAULT_INSTANCE.toBuilder(); + } + public static Builder newBuilder(KafkaPbMessage.ProtoTest prototype) { + return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype); + } + @java.lang.Override + public Builder toBuilder() { + return this == DEFAULT_INSTANCE + ? new Builder() : new Builder().mergeFrom(this); + } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessageV3.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code ProtoTest} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessageV3.Builder implements + // @@protoc_insertion_point(builder_implements:ProtoTest) + KafkaPbMessage.ProtoTestOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return KafkaPbMessage.internal_static_ProtoTest_descriptor; + } + + @java.lang.Override + protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable + internalGetFieldAccessorTable() { + return KafkaPbMessage.internal_static_ProtoTest_fieldAccessorTable + .ensureFieldAccessorsInitialized( + KafkaPbMessage.ProtoTest.class, KafkaPbMessage.ProtoTest.Builder.class); + } + + // Construct using KafkaPbMessage.ProtoTest.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessageV3.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessageV3 + .alwaysUseFieldBuilders) { + } + } + @java.lang.Override + public Builder clear() { + super.clear(); + iD_ = 0L; + bitField0_ = (bitField0_ & ~0x00000001); + nAME_ = ""; + bitField0_ = (bitField0_ & ~0x00000002); + dATE_ = ""; + bitField0_ = (bitField0_ & ~0x00000004); + return this; + } + + @java.lang.Override + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return KafkaPbMessage.internal_static_ProtoTest_descriptor; + } + + @java.lang.Override + public KafkaPbMessage.ProtoTest getDefaultInstanceForType() { + return KafkaPbMessage.ProtoTest.getDefaultInstance(); + } + + @java.lang.Override + public KafkaPbMessage.ProtoTest build() { + KafkaPbMessage.ProtoTest result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + @java.lang.Override + public KafkaPbMessage.ProtoTest buildPartial() { + KafkaPbMessage.ProtoTest result = new KafkaPbMessage.ProtoTest(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) != 0)) { + result.iD_ = iD_; + to_bitField0_ |= 0x00000001; + } + if (((from_bitField0_ & 0x00000002) != 0)) { + to_bitField0_ |= 0x00000002; + } + result.nAME_ = nAME_; + if (((from_bitField0_ & 0x00000004) != 0)) { + to_bitField0_ |= 0x00000004; + } + result.dATE_ = dATE_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + @java.lang.Override + public Builder clone() { + return super.clone(); + } + @java.lang.Override + public Builder setField( + com.google.protobuf.Descriptors.FieldDescriptor field, + java.lang.Object value) { + return super.setField(field, value); + } + @java.lang.Override + public Builder clearField( + com.google.protobuf.Descriptors.FieldDescriptor field) { + return super.clearField(field); + } + @java.lang.Override + public Builder clearOneof( + com.google.protobuf.Descriptors.OneofDescriptor oneof) { + return super.clearOneof(oneof); + } + @java.lang.Override + public Builder setRepeatedField( + com.google.protobuf.Descriptors.FieldDescriptor field, + int index, java.lang.Object value) { + return super.setRepeatedField(field, index, value); + } + @java.lang.Override + public Builder addRepeatedField( + com.google.protobuf.Descriptors.FieldDescriptor field, + java.lang.Object value) { + return super.addRepeatedField(field, value); + } + @java.lang.Override + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof KafkaPbMessage.ProtoTest) { + return mergeFrom((KafkaPbMessage.ProtoTest)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(KafkaPbMessage.ProtoTest other) { + if (other == KafkaPbMessage.ProtoTest.getDefaultInstance()) return this; + if (other.hasID()) { + setID(other.getID()); + } + if (other.hasNAME()) { + bitField0_ |= 0x00000002; + nAME_ = other.nAME_; + onChanged(); + } + if (other.hasDATE()) { + bitField0_ |= 0x00000004; + dATE_ = other.dATE_; + onChanged(); + } + this.mergeUnknownFields(other.unknownFields); + onChanged(); + return this; + } + + @java.lang.Override + public final boolean isInitialized() { + if (!hasID()) { + return false; + } + if (!hasNAME()) { + return false; + } + if (!hasDATE()) { + return false; + } + return true; + } + + @java.lang.Override + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + KafkaPbMessage.ProtoTest parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (KafkaPbMessage.ProtoTest) e.getUnfinishedMessage(); + throw e.unwrapIOException(); + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + private long iD_ ; + /** + * required int64 ID = 1; + * @return Whether the iD field is set. + */ + @java.lang.Override + public boolean hasID() { + return ((bitField0_ & 0x00000001) != 0); + } + /** + * required int64 ID = 1; + * @return The iD. + */ + @java.lang.Override + public long getID() { + return iD_; + } + /** + * required int64 ID = 1; + * @param value The iD to set. + * @return This builder for chaining. + */ + public Builder setID(long value) { + bitField0_ |= 0x00000001; + iD_ = value; + onChanged(); + return this; + } + /** + * required int64 ID = 1; + * @return This builder for chaining. + */ + public Builder clearID() { + bitField0_ = (bitField0_ & ~0x00000001); + iD_ = 0L; + onChanged(); + return this; + } + + private java.lang.Object nAME_ = ""; + /** + * required string NAME = 2; + * @return Whether the nAME field is set. + */ + public boolean hasNAME() { + return ((bitField0_ & 0x00000002) != 0); + } + /** + * required string NAME = 2; + * @return The nAME. + */ + public java.lang.String getNAME() { + java.lang.Object ref = nAME_; + if (!(ref instanceof java.lang.String)) { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + nAME_ = s; + } + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * required string NAME = 2; + * @return The bytes for nAME. + */ + public com.google.protobuf.ByteString + getNAMEBytes() { + java.lang.Object ref = nAME_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + nAME_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * required string NAME = 2; + * @param value The nAME to set. + * @return This builder for chaining. + */ + public Builder setNAME( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + nAME_ = value; + onChanged(); + return this; + } + /** + * required string NAME = 2; + * @return This builder for chaining. + */ + public Builder clearNAME() { + bitField0_ = (bitField0_ & ~0x00000002); + nAME_ = getDefaultInstance().getNAME(); + onChanged(); + return this; + } + /** + * required string NAME = 2; + * @param value The bytes for nAME to set. + * @return This builder for chaining. + */ + public Builder setNAMEBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + nAME_ = value; + onChanged(); + return this; + } + + private java.lang.Object dATE_ = ""; + /** + * required string DATE = 3; + * @return Whether the dATE field is set. + */ + public boolean hasDATE() { + return ((bitField0_ & 0x00000004) != 0); + } + /** + * required string DATE = 3; + * @return The dATE. + */ + public java.lang.String getDATE() { + java.lang.Object ref = dATE_; + if (!(ref instanceof java.lang.String)) { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + dATE_ = s; + } + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * required string DATE = 3; + * @return The bytes for dATE. + */ + public com.google.protobuf.ByteString + getDATEBytes() { + java.lang.Object ref = dATE_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + dATE_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * required string DATE = 3; + * @param value The dATE to set. + * @return This builder for chaining. + */ + public Builder setDATE( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000004; + dATE_ = value; + onChanged(); + return this; + } + /** + * required string DATE = 3; + * @return This builder for chaining. + */ + public Builder clearDATE() { + bitField0_ = (bitField0_ & ~0x00000004); + dATE_ = getDefaultInstance().getDATE(); + onChanged(); + return this; + } + /** + * required string DATE = 3; + * @param value The bytes for dATE to set. + * @return This builder for chaining. + */ + public Builder setDATEBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000004; + dATE_ = value; + onChanged(); + return this; + } + @java.lang.Override + public final Builder setUnknownFields( + final com.google.protobuf.UnknownFieldSet unknownFields) { + return super.setUnknownFields(unknownFields); + } + + @java.lang.Override + public final Builder mergeUnknownFields( + final com.google.protobuf.UnknownFieldSet unknownFields) { + return super.mergeUnknownFields(unknownFields); + } + + + // @@protoc_insertion_point(builder_scope:ProtoTest) + } + + // @@protoc_insertion_point(class_scope:ProtoTest) + private static final KafkaPbMessage.ProtoTest DEFAULT_INSTANCE; + static { + DEFAULT_INSTANCE = new KafkaPbMessage.ProtoTest(); + } + + public static KafkaPbMessage.ProtoTest getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + @java.lang.Deprecated public static final com.google.protobuf.Parser + PARSER = new com.google.protobuf.AbstractParser() { + @java.lang.Override + public ProtoTest parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new ProtoTest(input, extensionRegistry); + } + }; + + public static com.google.protobuf.Parser parser() { + return PARSER; + } + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + @java.lang.Override + public KafkaPbMessage.ProtoTest getDefaultInstanceForType() { + return DEFAULT_INSTANCE; + } + + } + + private static final com.google.protobuf.Descriptors.Descriptor + internal_static_ProtoTest_descriptor; + private static final + com.google.protobuf.GeneratedMessageV3.FieldAccessorTable + internal_static_ProtoTest_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\013KafkaPbMessage.proto\"3\n\tProtoTest\022\n\n\002ID\030\001 \002(\003\022\014" + + "\n\004NAME\030\002 \002(\t\022\014\n\004DATE\030\003 \002(\t" + }; + descriptor = com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + }); + internal_static_ProtoTest_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_ProtoTest_fieldAccessorTable = new + com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( + internal_static_ProtoTest_descriptor, + new java.lang.String[] { "ID", "NAME", "DATE", }); + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/java/com/bytedance/bitsail/connector/legacy/kafka/source/KafkaSourceITCase.java b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/java/com/bytedance/bitsail/connector/legacy/kafka/source/KafkaSourceITCase.java index 57587b673..5147a8701 100644 --- a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/java/com/bytedance/bitsail/connector/legacy/kafka/source/KafkaSourceITCase.java +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/java/com/bytedance/bitsail/connector/legacy/kafka/source/KafkaSourceITCase.java @@ -18,12 +18,18 @@ package com.bytedance.bitsail.connector.legacy.kafka.source; import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.legacy.kafka.common.KafkaPbMessage; +import com.bytedance.bitsail.parser.option.RowParserOptions; import com.bytedance.bitsail.test.connector.test.EmbeddedFlinkCluster; import com.bytedance.bitsail.test.connector.test.testcontainers.kafka.KafkaCluster; import com.bytedance.bitsail.test.connector.test.utils.JobConfUtils; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Maps; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import lombok.SneakyThrows; +import org.apache.commons.io.IOUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -33,6 +39,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.net.URI; +import java.util.Base64; import java.util.Map; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -59,34 +68,87 @@ private static String constructARecord(int index) { public void before() { kafkaCluster.startService(); kafkaCluster.createTopic(topicName); - startSendDataToKafka(); } - private void startSendDataToKafka() { + private void startSendJsonDataToKafka() { KafkaProducer producer = kafkaCluster.getProducer(topicName); ScheduledThreadPoolExecutor produceService = new ScheduledThreadPoolExecutor(1); AtomicInteger sendCount = new AtomicInteger(0); produceService.scheduleAtFixedRate(() -> { try { - for (int i = 0; i < 5000; ++i) { - String record = constructARecord(sendCount.getAndIncrement()); - producer.send(new ProducerRecord(topicName, record)); - } + for (int i = 0; i < 5000; ++i) { + String record = constructARecord(sendCount.getAndIncrement()); + producer.send(new ProducerRecord(topicName, record)); + } } catch (Exception e) { - LOG.error("failed to send a record"); + LOG.error("failed to send a record"); } finally { - LOG.info(">>> kafka produce count: {}", sendCount.get()); + LOG.info(">>> kafka produce count: {}", sendCount.get()); + } + }, 0, 1, TimeUnit.SECONDS); + } + + @SneakyThrows + private void startSendPbDataToKafka() { + + Descriptors.Descriptor type = KafkaPbMessage.getDescriptor().findMessageTypeByName("ProtoTest"); + + KafkaProducer producer = kafkaCluster.getProducer(topicName); + ScheduledThreadPoolExecutor produceService = new ScheduledThreadPoolExecutor(1); + AtomicInteger sendCount = new AtomicInteger(0); + produceService.scheduleAtFixedRate(() -> { + try { + for (int i = 0; i < 5000; ++i) { + DynamicMessage.Builder builder = DynamicMessage.newBuilder(type); + for (Descriptors.FieldDescriptor field : type.getFields()) { + switch (field.getJsonName()) { + case "NAME": + builder.setField(field, "text_" + i); + break; + case "DATE": + builder.setField(field, String.valueOf(System.currentTimeMillis())); + break; + case "ID": + builder.setField(field, (long) i); + break; + default: + builder.setField(field, String.valueOf(i)); + break; + } + } + DynamicMessage message = builder.build(); + producer.send(new ProducerRecord(topicName, new String(message.toByteArray()))); + sendCount.getAndIncrement(); + } + } catch (Exception e) { + LOG.error("failed to send a record"); + } finally { + LOG.info(">>> kafka produce count: {}", sendCount.get()); } }, 0, 1, TimeUnit.SECONDS); } @Test - public void testKafkaSource() throws Exception { + public void testKafkaSourceJsonFormat() throws Exception { + startSendJsonDataToKafka(); BitSailConfiguration configuration = JobConfUtils.fromClasspath("kafka_to_print.json"); updateConfiguration(configuration); EmbeddedFlinkCluster.submitJob(configuration); } + @Test + public void testKafkaSourcePbFormat() throws Exception { + startSendPbDataToKafka(); + BitSailConfiguration configuration = JobConfUtils.fromClasspath("kafka_to_print_pb_format.json"); + URI proto = KafkaSourceITCase.class.getClassLoader().getResource("kafka.fds").toURI(); + byte[] descriptor = IOUtils.toByteArray(new File(proto).toURI()); + + configuration.set(RowParserOptions.PROTO_DESCRIPTOR, Base64.getEncoder().encodeToString(descriptor)); + configuration.set(RowParserOptions.PROTO_CLASS_NAME, "ProtoTest"); + updateConfiguration(configuration); + EmbeddedFlinkCluster.submitJob(configuration); + } + protected void updateConfiguration(BitSailConfiguration jobConfiguration) { // jobConfiguration.set(FakeReaderOptions.TOTAL_COUNT, TOTAL_SEND_COUNT); diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka.fds b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka.fds new file mode 100644 index 000000000..12950ba66 --- /dev/null +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka.fds @@ -0,0 +1,7 @@ + +Z + kafka.proto"C + ProtoTest +id (Rid +name ( Rname +date ( Rdatebproto3 \ No newline at end of file diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka.proto b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka.proto new file mode 100644 index 000000000..93556d367 --- /dev/null +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; + +message ProtoTest { + int64 ID = 1; + string NAME = 2; + string DATE = 3; +} diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka_to_print_pb_format.json b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka_to_print_pb_format.json new file mode 100644 index 000000000..e08af60b4 --- /dev/null +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka_to_print_pb_format.json @@ -0,0 +1,52 @@ +{ + "job": { + "common": { + "job_type": "STREAMING", + "job_plugin_lib_dir": "plugin", + "job_plugin_conf_dir": "plugin_conf", + "enable_dynamic_loader": true, + "instance_id": "1", + "internal_instance_id": "1", + "extra_properties": { + "update-mode": "append" + }, + "proto": { + "descriptor": "", + "class_name": "" + } + }, + "reader": { + "connector": { + "connector": { + "bootstrap.servers": "PLAINTEXT://localhost:9092", + "topic": "testTopic", + "startup-mode": "earliest-offset", + "group": { + "id": "test_consumer" + } + } + }, + "enable_count_mode": true, + "columns": [ + { + "name": "ID", + "type": "varchar" + }, + { + "name": "name", + "type": "varchar" + }, + { + "name": "DATE", + "type": "varchar" + } + ], + "format_type": "protobuf", + "child_connector_type": "kafka", + "class": "com.bytedance.bitsail.connector.legacy.kafka.source.KafkaSourceFunctionDAGBuilder" + }, + "writer": { + "class": "com.bytedance.bitsail.connector.legacy.print.sink.PrintSink" + } + } +} \ No newline at end of file