diff --git a/bitsail-components/bitsail-component-formats/bitsail-component-format-pb/pom.xml b/bitsail-components/bitsail-component-formats/bitsail-component-format-pb/pom.xml new file mode 100644 index 000000000..a68da039a --- /dev/null +++ b/bitsail-components/bitsail-component-formats/bitsail-component-format-pb/pom.xml @@ -0,0 +1,19 @@ + + + + bitsail-component-formats + com.bytedance.bitsail + ${revision} + + 4.0.0 + + bitsail-component-format-pb + + + 8 + 8 + + + \ No newline at end of file diff --git a/bitsail-components/bitsail-component-formats/pom.xml b/bitsail-components/bitsail-component-formats/pom.xml index 9c28f9e40..ef81d2cd7 100644 --- a/bitsail-components/bitsail-component-formats/pom.xml +++ b/bitsail-components/bitsail-component-formats/pom.xml @@ -32,6 +32,7 @@ pom bitsail-component-format-json + bitsail-component-format-pb diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/pom.xml b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/pom.xml index 6dc069fc2..a79f5fd8a 100644 --- a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/pom.xml +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/pom.xml @@ -98,6 +98,10 @@ ${revision} test + + com.bytedance.bitsail + bitsail-flink-row-parser + \ No newline at end of file diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/main/java/com/bytedance/bitsail/connector/legacy/kafka/deserialization/DeserializationSchemaFactory.java b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/main/java/com/bytedance/bitsail/connector/legacy/kafka/deserialization/DeserializationSchemaFactory.java index 6eb34dbd6..f1251a65b 100644 --- a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/main/java/com/bytedance/bitsail/connector/legacy/kafka/deserialization/DeserializationSchemaFactory.java +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/main/java/com/bytedance/bitsail/connector/legacy/kafka/deserialization/DeserializationSchemaFactory.java @@ -40,6 +40,7 @@ public class DeserializationSchemaFactory { private static final String STREAMING_FILE_DESERIALIZATION_SCHEMA_KEY = "streaming_file"; private static final String JSON_DESERIALIZATION_SCHEMA_KEY = "json"; + private static final String PB_DESERIALIZATION_SCHEMA_KEY = "protobuf"; public static KafkaDeserializationSchema getDeserializationSchema(BitSailConfiguration configuration) { String formatType = configuration.get(BaseMessageQueueReaderOptions.FORMAT_TYPE); @@ -64,6 +65,14 @@ public static KafkaDeserializationSchema getDeserializationSchema(BitSailCo .build()); } + if (StringUtils.equalsIgnoreCase(PB_DESERIALIZATION_SCHEMA_KEY, formatType)) { + try { + return new CountKafkaDeserializationSchemaWrapper<>(configuration, new PbDeserializationSchema(configuration)); + } catch (Exception e) { + throw new IllegalArgumentException("Pb parser encountered error during initialization.", e); + } + } + throw new IllegalArgumentException(String.format("Unsupported %s format type.", formatType)); } } diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/main/java/com/bytedance/bitsail/connector/legacy/kafka/deserialization/PbDeserializationSchema.java b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/main/java/com/bytedance/bitsail/connector/legacy/kafka/deserialization/PbDeserializationSchema.java new file mode 100644 index 000000000..5506eabe5 --- /dev/null +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/main/java/com/bytedance/bitsail/connector/legacy/kafka/deserialization/PbDeserializationSchema.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.legacy.kafka.deserialization; + +import com.bytedance.bitsail.batch.file.parser.PbBytesParser; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.flink.core.typeinfo.PrimitiveColumnTypeInfo; + +import com.google.protobuf.Descriptors; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.types.Row; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.util.List; + +@Internal +public class PbDeserializationSchema implements DeserializationSchema { + private static final long serialVersionUID = -2556547991095476394L; + private final PbBytesParser parser; + private final RowTypeInfo rowTypeInfo; + private final int arity; + + public PbDeserializationSchema(BitSailConfiguration jobConf) throws Exception { + this.parser = new PbBytesParser(jobConf); + + List fields = parser.getDescriptor().getFields(); + this.arity = fields.size(); + PrimitiveColumnTypeInfo[] types = new PrimitiveColumnTypeInfo[arity]; + String[] fieldNames = new String[arity]; + for (int i = 0; i < arity; i++) { + Descriptors.FieldDescriptor field = fields.get(i); + types[i] = getColumnTypeInfo(field); + fieldNames[i] = field.getJsonName(); + } + this.rowTypeInfo = new RowTypeInfo(types, fieldNames); + } + + private PrimitiveColumnTypeInfo getColumnTypeInfo(Descriptors.FieldDescriptor field) { + switch (field.getJavaType()) { + case INT: + case LONG: + return PrimitiveColumnTypeInfo.LONG_COLUMN_TYPE_INFO; + case FLOAT: + case DOUBLE: + return PrimitiveColumnTypeInfo.DOUBLE_COLUMN_TYPE_INFO; + case BOOLEAN: + return PrimitiveColumnTypeInfo.BOOL_COLUMN_TYPE_INFO; + case BYTE_STRING: + return PrimitiveColumnTypeInfo.BYTES_COLUMN_TYPE_INFO; + case MESSAGE: + case STRING: + case ENUM: + default: + return PrimitiveColumnTypeInfo.STRING_COLUMN_TYPE_INFO; + } + } + + @Override + public void open(DeserializationSchema.InitializationContext context) throws Exception { + } + + @Override + public Row deserialize(byte[] value) throws IOException { + return this.parser.parse(new Row(arity), value, 0, value.length, null, rowTypeInfo); + } + + @Override + public void deserialize(byte[] value, Collector out) throws IOException { + out.collect(deserialize(value)); + } + + @Override + public boolean isEndOfStream(Row row) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return rowTypeInfo; + } +} diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/java/com/bytedance/bitsail/connector/legacy/kafka/source/KafkaSourceITCase.java b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/java/com/bytedance/bitsail/connector/legacy/kafka/source/KafkaSourceITCase.java index 57587b673..8f4e72120 100644 --- a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/java/com/bytedance/bitsail/connector/legacy/kafka/source/KafkaSourceITCase.java +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/java/com/bytedance/bitsail/connector/legacy/kafka/source/KafkaSourceITCase.java @@ -17,22 +17,36 @@ package com.bytedance.bitsail.connector.legacy.kafka.source; +import com.bytedance.bitsail.batch.file.parser.PbBytesParser; import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.parser.option.RowParserOptions; import com.bytedance.bitsail.test.connector.test.EmbeddedFlinkCluster; import com.bytedance.bitsail.test.connector.test.testcontainers.kafka.KafkaCluster; import com.bytedance.bitsail.test.connector.test.utils.JobConfUtils; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import org.apache.commons.io.IOUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; +import java.io.File; +import java.net.URI; +import java.util.Base64; +import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -46,6 +60,7 @@ public class KafkaSourceITCase { private static final int TOTAL_SEND_COUNT = 300; private final String topicName = "testTopic"; private final KafkaCluster kafkaCluster = new KafkaCluster(); + private ScheduledThreadPoolExecutor produceService; private static String constructARecord(int index) { JSONObject jsonObject = new JSONObject(); @@ -59,34 +74,120 @@ private static String constructARecord(int index) { public void before() { kafkaCluster.startService(); kafkaCluster.createTopic(topicName); - startSendDataToKafka(); + produceService = new ScheduledThreadPoolExecutor(1); } - private void startSendDataToKafka() { + private void startSendJsonDataToKafka() { KafkaProducer producer = kafkaCluster.getProducer(topicName); - ScheduledThreadPoolExecutor produceService = new ScheduledThreadPoolExecutor(1); AtomicInteger sendCount = new AtomicInteger(0); produceService.scheduleAtFixedRate(() -> { try { - for (int i = 0; i < 5000; ++i) { - String record = constructARecord(sendCount.getAndIncrement()); - producer.send(new ProducerRecord(topicName, record)); - } + for (int i = 0; i < 5000; ++i) { + String record = constructARecord(sendCount.getAndIncrement()); + producer.send(new ProducerRecord(topicName, record)); + } } catch (Exception e) { - LOG.error("failed to send a record"); + LOG.error("failed to send a record"); } finally { - LOG.info(">>> kafka produce count: {}", sendCount.get()); + LOG.info(">>> kafka produce count: {}", sendCount.get()); + } + }, 0, 1, TimeUnit.SECONDS); + } + + private void startSendPbDataToKafka(BitSailConfiguration configuration) throws Exception { + PbBytesParser parser = new PbBytesParser(configuration); + List fields = parser.getDescriptor().getFields(); + Descriptors.Descriptor type = parser.getDescriptor(); + + KafkaProducer producer = new KafkaProducer<>( + ImmutableMap.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaCluster.getBootstrapServer(), + ProducerConfig.CLIENT_ID_CONFIG, "producer" + ), + new StringSerializer(), + new ByteArraySerializer() + ); + AtomicInteger sendCount = new AtomicInteger(0); + produceService.scheduleAtFixedRate(() -> { + try { + for (int i = 0; i < 5000; ++i) { + DynamicMessage.Builder builder = DynamicMessage.newBuilder(type); + for (Descriptors.FieldDescriptor field : fields) { + switch (field.getJavaType()) { + case INT: + builder.setField(field, i); + break; + case LONG: + builder.setField(field, (long) i); + break; + case FLOAT: + builder.setField(field, (float) i); + break; + case DOUBLE: + builder.setField(field, (double) i); + break; + case BOOLEAN: + builder.setField(field, i % 2 == 0); + break; + case BYTE_STRING: + builder.setField(field, ByteString.copyFrom(("bytes_" + i).getBytes())); + break; + case MESSAGE: + case STRING: + case ENUM: + default: + builder.setField(field, "text_" + i); + break; + } + } + DynamicMessage message = builder.build(); + producer.send(new ProducerRecord(topicName, message.toByteArray())); + sendCount.getAndIncrement(); + } + } catch (Exception e) { + LOG.error("failed to send a record"); + } finally { + LOG.info(">>> kafka produce count: {}", sendCount.get()); } }, 0, 1, TimeUnit.SECONDS); } @Test - public void testKafkaSource() throws Exception { + public void testKafkaSourceJsonFormat() throws Exception { + startSendJsonDataToKafka(); BitSailConfiguration configuration = JobConfUtils.fromClasspath("kafka_to_print.json"); updateConfiguration(configuration); EmbeddedFlinkCluster.submitJob(configuration); } + @Test + public void testKafkaSourcePbFormat() throws Exception { + BitSailConfiguration configuration = JobConfUtils.fromClasspath("kafka_to_print_pb_format.json"); + URI proto = KafkaSourceITCase.class.getClassLoader().getResource("kafka.fds").toURI(); + byte[] descriptor = IOUtils.toByteArray(new File(proto).toURI()); + + String protoDescriptor = Base64.getEncoder().encodeToString(descriptor); + configuration.set(RowParserOptions.PROTO_DESCRIPTOR, protoDescriptor); + configuration.set(RowParserOptions.PROTO_CLASS_NAME, "ProtoTest"); + startSendPbDataToKafka(configuration); + updateConfiguration(configuration); + EmbeddedFlinkCluster.submitJob(configuration); + } + + @Test + public void testKafkaSourcePbFormatFullTypes() throws Exception { + BitSailConfiguration configuration = JobConfUtils.fromClasspath("kafka_to_print_pb_format_full_types.json"); + URI proto = KafkaSourceITCase.class.getClassLoader().getResource("kafka_full_types.fds").toURI(); + byte[] descriptor = IOUtils.toByteArray(new File(proto).toURI()); + + String protoDescriptor = Base64.getEncoder().encodeToString(descriptor); + configuration.set(RowParserOptions.PROTO_DESCRIPTOR, protoDescriptor); + configuration.set(RowParserOptions.PROTO_CLASS_NAME, "ProtoTest"); + startSendPbDataToKafka(configuration); + updateConfiguration(configuration); + EmbeddedFlinkCluster.submitJob(configuration); + } + protected void updateConfiguration(BitSailConfiguration jobConfiguration) { // jobConfiguration.set(FakeReaderOptions.TOTAL_COUNT, TOTAL_SEND_COUNT); @@ -98,6 +199,7 @@ protected void updateConfiguration(BitSailConfiguration jobConfiguration) { @After public void after() { + produceService.shutdown(); kafkaCluster.stopService(); } diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka.fds b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka.fds new file mode 100644 index 000000000..12950ba66 --- /dev/null +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka.fds @@ -0,0 +1,7 @@ + +Z + kafka.proto"C + ProtoTest +id (Rid +name ( Rname +date ( Rdatebproto3 \ No newline at end of file diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka.proto b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka.proto new file mode 100644 index 000000000..93556d367 --- /dev/null +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; + +message ProtoTest { + int64 ID = 1; + string NAME = 2; + string DATE = 3; +} diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka_full_types.fds b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka_full_types.fds new file mode 100644 index 000000000..edbe74353 --- /dev/null +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka_full_types.fds @@ -0,0 +1,20 @@ + +¢ +kafka_full_types.proto"ÿ + ProtoTest +field1 (Rfield1 +field2 (Rfield2 +field3 (Rfield3 +field4 (Rfield4 +field5 ( Rfield5 +field6 (Rfield6 +field7 (Rfield7 +field8 (Rfield8 +field9 (Rfield9 +field10 + (Rfield10 +field11 (Rfield11 +field12 (Rfield12 +field13 (Rfield13 +field14 ( Rfield14 +field15 ( Rfield15bproto3 \ No newline at end of file diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka_full_types.proto b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka_full_types.proto new file mode 100644 index 000000000..88f2f05a1 --- /dev/null +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka_full_types.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +message ProtoTest { + double field1 = 1; + float field2 = 2; + int32 field3 = 3; + int64 field4 = 4; + uint32 field5 = 5; + uint64 field6 = 6; + sint32 field7 = 7; + sint64 field8 = 8; + fixed32 field9 = 9; + fixed64 field10 = 10; + sfixed32 field11 = 11; + sfixed64 field12 = 12; + bool field13 = 13; + string field14 = 14; + bytes field15 = 15; +} diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka_to_print_pb_format.json b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka_to_print_pb_format.json new file mode 100644 index 000000000..88f901315 --- /dev/null +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka_to_print_pb_format.json @@ -0,0 +1,39 @@ +{ + "job": { + "common": { + "job_type": "STREAMING", + "job_plugin_lib_dir": "plugin", + "job_plugin_conf_dir": "plugin_conf", + "enable_dynamic_loader": true, + "instance_id": "1", + "internal_instance_id": "1", + "extra_properties": { + "update-mode": "append" + }, + "proto": { + "descriptor": "CloKC2thZmthLnByb3RvIkMKCVByb3RvVGVzdBIOCgJpZBgBIAEoA1ICaWQSEgoEbmFtZRgCIAEoCVIEbmFtZRISCgRkYXRlGAMgASgJUgRkYXRlYgZwcm90bzM=", + "class_name": "ProtoTest" + } + }, + "reader": { + "connector": { + "connector": { + "bootstrap.servers": "PLAINTEXT://localhost:9092", + "topic": "testTopic", + "startup-mode": "earliest-offset", + "group": { + "id": "test_consumer" + } + } + }, + "enable_count_mode": true, + "count_mode_record_threshold": 1000, + "format_type": "protobuf", + "child_connector_type": "kafka", + "class": "com.bytedance.bitsail.connector.legacy.kafka.source.KafkaSourceFunctionDAGBuilder" + }, + "writer": { + "class": "com.bytedance.bitsail.connector.legacy.print.sink.PrintSink" + } + } +} \ No newline at end of file diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka_to_print_pb_format_full_types.json b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka_to_print_pb_format_full_types.json new file mode 100644 index 000000000..7d5ed2b2f --- /dev/null +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka_to_print_pb_format_full_types.json @@ -0,0 +1,39 @@ +{ + "job": { + "common": { + "job_type": "STREAMING", + "job_plugin_lib_dir": "plugin", + "job_plugin_conf_dir": "plugin_conf", + "enable_dynamic_loader": true, + "instance_id": "1", + "internal_instance_id": "1", + "extra_properties": { + "update-mode": "append" + }, + "proto": { + "descriptor": "CqIDChZrYWZrYV9mdWxsX3R5cGVzLnByb3RvIv8CCglQcm90b1Rlc3QSFgoGZmllbGQxGAEgASgBUgZmaWVsZDESFgoGZmllbGQyGAIgASgCUgZmaWVsZDISFgoGZmllbGQzGAMgASgFUgZmaWVsZDMSFgoGZmllbGQ0GAQgASgDUgZmaWVsZDQSFgoGZmllbGQ1GAUgASgNUgZmaWVsZDUSFgoGZmllbGQ2GAYgASgEUgZmaWVsZDYSFgoGZmllbGQ3GAcgASgRUgZmaWVsZDcSFgoGZmllbGQ4GAggASgSUgZmaWVsZDgSFgoGZmllbGQ5GAkgASgHUgZmaWVsZDkSGAoHZmllbGQxMBgKIAEoBlIHZmllbGQxMBIYCgdmaWVsZDExGAsgASgPUgdmaWVsZDExEhgKB2ZpZWxkMTIYDCABKBBSB2ZpZWxkMTISGAoHZmllbGQxMxgNIAEoCFIHZmllbGQxMxIYCgdmaWVsZDE0GA4gASgJUgdmaWVsZDE0EhgKB2ZpZWxkMTUYDyABKAxSB2ZpZWxkMTViBnByb3RvMw==", + "class_name": "ProtoTest" + } + }, + "reader": { + "connector": { + "connector": { + "bootstrap.servers": "PLAINTEXT://localhost:9092", + "topic": "testTopic", + "startup-mode": "earliest-offset", + "group": { + "id": "test_consumer" + } + } + }, + "enable_count_mode": true, + "count_mode_record_threshold": 1000, + "format_type": "protobuf", + "child_connector_type": "kafka", + "class": "com.bytedance.bitsail.connector.legacy.kafka.source.KafkaSourceFunctionDAGBuilder" + }, + "writer": { + "class": "com.bytedance.bitsail.connector.legacy.print.sink.PrintSink" + } + } +} \ No newline at end of file