Skip to content

Commit

Permalink
to #103, Support PB format serializer in Kafka Source
Browse files Browse the repository at this point in the history
  • Loading branch information
qidian99 committed Nov 22, 2022
1 parent 7bd7aef commit 00fca9e
Show file tree
Hide file tree
Showing 12 changed files with 376 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bitsail-component-formats</artifactId>
<groupId>com.bytedance.bitsail</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>bitsail-component-format-pb</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

</project>
1 change: 1 addition & 0 deletions bitsail-components/bitsail-component-formats/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
<packaging>pom</packaging>
<modules>
<module>bitsail-component-format-json</module>
<module>bitsail-component-format-pb</module>
</modules>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@
<version>${revision}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>bitsail-flink-row-parser</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
public class DeserializationSchemaFactory {
private static final String STREAMING_FILE_DESERIALIZATION_SCHEMA_KEY = "streaming_file";
private static final String JSON_DESERIALIZATION_SCHEMA_KEY = "json";
private static final String PB_DESERIALIZATION_SCHEMA_KEY = "protobuf";

public static KafkaDeserializationSchema<Row> getDeserializationSchema(BitSailConfiguration configuration) {
String formatType = configuration.get(BaseMessageQueueReaderOptions.FORMAT_TYPE);
Expand All @@ -64,6 +65,14 @@ public static KafkaDeserializationSchema<Row> getDeserializationSchema(BitSailCo
.build());
}

if (StringUtils.equalsIgnoreCase(PB_DESERIALIZATION_SCHEMA_KEY, formatType)) {
try {
return new CountKafkaDeserializationSchemaWrapper<>(configuration, new PbDeserializationSchema(configuration));
} catch (Exception e) {
throw new IllegalArgumentException("Pb parser encountered error during initialization.", e);
}
}

throw new IllegalArgumentException(String.format("Unsupported %s format type.", formatType));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.legacy.kafka.deserialization;

import com.bytedance.bitsail.batch.file.parser.PbBytesParser;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.flink.core.typeinfo.PrimitiveColumnTypeInfo;

import com.google.protobuf.Descriptors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.util.List;

@Internal
public class PbDeserializationSchema implements DeserializationSchema<Row> {
private static final long serialVersionUID = -2556547991095476394L;
private final PbBytesParser parser;
private final RowTypeInfo rowTypeInfo;
private final int arity;

public PbDeserializationSchema(BitSailConfiguration jobConf) throws Exception {
this.parser = new PbBytesParser(jobConf);

List<Descriptors.FieldDescriptor> fields = parser.getDescriptor().getFields();
this.arity = fields.size();
PrimitiveColumnTypeInfo<?>[] types = new PrimitiveColumnTypeInfo[arity];
String[] fieldNames = new String[arity];
for (int i = 0; i < arity; i++) {
Descriptors.FieldDescriptor field = fields.get(i);
types[i] = getColumnTypeInfo(field);
fieldNames[i] = field.getJsonName();
}
this.rowTypeInfo = new RowTypeInfo(types, fieldNames);
}

private PrimitiveColumnTypeInfo<?> getColumnTypeInfo(Descriptors.FieldDescriptor field) {
switch (field.getJavaType()) {
case INT:
case LONG:
return PrimitiveColumnTypeInfo.LONG_COLUMN_TYPE_INFO;
case FLOAT:
case DOUBLE:
return PrimitiveColumnTypeInfo.DOUBLE_COLUMN_TYPE_INFO;
case BOOLEAN:
return PrimitiveColumnTypeInfo.BOOL_COLUMN_TYPE_INFO;
case BYTE_STRING:
return PrimitiveColumnTypeInfo.BYTES_COLUMN_TYPE_INFO;
case MESSAGE:
case STRING:
case ENUM:
default:
return PrimitiveColumnTypeInfo.STRING_COLUMN_TYPE_INFO;
}
}

@Override
public void open(DeserializationSchema.InitializationContext context) throws Exception {
}

@Override
public Row deserialize(byte[] value) throws IOException {
return this.parser.parse(new Row(arity), value, 0, value.length, null, rowTypeInfo);
}

@Override
public void deserialize(byte[] value, Collector<Row> out) throws IOException {
out.collect(deserialize(value));
}

@Override
public boolean isEndOfStream(Row row) {
return false;
}

@Override
public TypeInformation<Row> getProducedType() {
return rowTypeInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,36 @@

package com.bytedance.bitsail.connector.legacy.kafka.source;

import com.bytedance.bitsail.batch.file.parser.PbBytesParser;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.parser.option.RowParserOptions;
import com.bytedance.bitsail.test.connector.test.EmbeddedFlinkCluster;
import com.bytedance.bitsail.test.connector.test.testcontainers.kafka.KafkaCluster;
import com.bytedance.bitsail.test.connector.test.utils.JobConfUtils;

import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;

import java.io.File;
import java.net.URI;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -46,6 +60,7 @@ public class KafkaSourceITCase {
private static final int TOTAL_SEND_COUNT = 300;
private final String topicName = "testTopic";
private final KafkaCluster kafkaCluster = new KafkaCluster();
private ScheduledThreadPoolExecutor produceService;

private static String constructARecord(int index) {
JSONObject jsonObject = new JSONObject();
Expand All @@ -59,34 +74,120 @@ private static String constructARecord(int index) {
public void before() {
kafkaCluster.startService();
kafkaCluster.createTopic(topicName);
startSendDataToKafka();
produceService = new ScheduledThreadPoolExecutor(1);
}

private void startSendDataToKafka() {
private void startSendJsonDataToKafka() {
KafkaProducer<String, String> producer = kafkaCluster.getProducer(topicName);
ScheduledThreadPoolExecutor produceService = new ScheduledThreadPoolExecutor(1);
AtomicInteger sendCount = new AtomicInteger(0);
produceService.scheduleAtFixedRate(() -> {
try {
for (int i = 0; i < 5000; ++i) {
String record = constructARecord(sendCount.getAndIncrement());
producer.send(new ProducerRecord(topicName, record));
}
for (int i = 0; i < 5000; ++i) {
String record = constructARecord(sendCount.getAndIncrement());
producer.send(new ProducerRecord(topicName, record));
}
} catch (Exception e) {
LOG.error("failed to send a record");
LOG.error("failed to send a record");
} finally {
LOG.info(">>> kafka produce count: {}", sendCount.get());
LOG.info(">>> kafka produce count: {}", sendCount.get());
}
}, 0, 1, TimeUnit.SECONDS);
}

private void startSendPbDataToKafka(BitSailConfiguration configuration) throws Exception {
PbBytesParser parser = new PbBytesParser(configuration);
List<Descriptors.FieldDescriptor> fields = parser.getDescriptor().getFields();
Descriptors.Descriptor type = parser.getDescriptor();

KafkaProducer<String, byte[]> producer = new KafkaProducer<>(
ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaCluster.getBootstrapServer(),
ProducerConfig.CLIENT_ID_CONFIG, "producer"
),
new StringSerializer(),
new ByteArraySerializer()
);
AtomicInteger sendCount = new AtomicInteger(0);
produceService.scheduleAtFixedRate(() -> {
try {
for (int i = 0; i < 5000; ++i) {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(type);
for (Descriptors.FieldDescriptor field : fields) {
switch (field.getJavaType()) {
case INT:
builder.setField(field, i);
break;
case LONG:
builder.setField(field, (long) i);
break;
case FLOAT:
builder.setField(field, (float) i);
break;
case DOUBLE:
builder.setField(field, (double) i);
break;
case BOOLEAN:
builder.setField(field, i % 2 == 0);
break;
case BYTE_STRING:
builder.setField(field, ByteString.copyFrom(("bytes_" + i).getBytes()));
break;
case MESSAGE:
case STRING:
case ENUM:
default:
builder.setField(field, "text_" + i);
break;
}
}
DynamicMessage message = builder.build();
producer.send(new ProducerRecord(topicName, message.toByteArray()));
sendCount.getAndIncrement();
}
} catch (Exception e) {
LOG.error("failed to send a record");
} finally {
LOG.info(">>> kafka produce count: {}", sendCount.get());
}
}, 0, 1, TimeUnit.SECONDS);
}

@Test
public void testKafkaSource() throws Exception {
public void testKafkaSourceJsonFormat() throws Exception {
startSendJsonDataToKafka();
BitSailConfiguration configuration = JobConfUtils.fromClasspath("kafka_to_print.json");
updateConfiguration(configuration);
EmbeddedFlinkCluster.submitJob(configuration);
}

@Test
public void testKafkaSourcePbFormat() throws Exception {
BitSailConfiguration configuration = JobConfUtils.fromClasspath("kafka_to_print_pb_format.json");
URI proto = KafkaSourceITCase.class.getClassLoader().getResource("kafka.fds").toURI();
byte[] descriptor = IOUtils.toByteArray(new File(proto).toURI());

String protoDescriptor = Base64.getEncoder().encodeToString(descriptor);
configuration.set(RowParserOptions.PROTO_DESCRIPTOR, protoDescriptor);
configuration.set(RowParserOptions.PROTO_CLASS_NAME, "ProtoTest");
startSendPbDataToKafka(configuration);
updateConfiguration(configuration);
EmbeddedFlinkCluster.submitJob(configuration);
}

@Test
public void testKafkaSourcePbFormatFullTypes() throws Exception {
BitSailConfiguration configuration = JobConfUtils.fromClasspath("kafka_to_print_pb_format_full_types.json");
URI proto = KafkaSourceITCase.class.getClassLoader().getResource("kafka_full_types.fds").toURI();
byte[] descriptor = IOUtils.toByteArray(new File(proto).toURI());

String protoDescriptor = Base64.getEncoder().encodeToString(descriptor);
configuration.set(RowParserOptions.PROTO_DESCRIPTOR, protoDescriptor);
configuration.set(RowParserOptions.PROTO_CLASS_NAME, "ProtoTest");
startSendPbDataToKafka(configuration);
updateConfiguration(configuration);
EmbeddedFlinkCluster.submitJob(configuration);
}

protected void updateConfiguration(BitSailConfiguration jobConfiguration) {
// jobConfiguration.set(FakeReaderOptions.TOTAL_COUNT, TOTAL_SEND_COUNT);

Expand All @@ -98,6 +199,7 @@ protected void updateConfiguration(BitSailConfiguration jobConfiguration) {

@After
public void after() {
produceService.shutdown();
kafkaCluster.stopService();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

Z
kafka.proto"C
ProtoTest
id (Rid
name ( Rname
date ( Rdatebproto3
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
syntax = "proto3";

message ProtoTest {
int64 ID = 1;
string NAME = 2;
string DATE = 3;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@

�
kafka_full_types.proto"�
ProtoTest
field1 (Rfield1
field2 (Rfield2
field3 (Rfield3
field4 (Rfield4
field5 (Rfield5
field6 (Rfield6
field7 (Rfield7
field8 (Rfield8
field9 (Rfield9
field10
(Rfield10
field11 (Rfield11
field12 (Rfield12
field13 (Rfield13
field14 ( Rfield14
field15 ( Rfield15bproto3
Expand Down
Loading

0 comments on commit 00fca9e

Please sign in to comment.