Skip to content

Commit

Permalink
to bytedance#103, pruned test cases for Kafka PB format
Browse files Browse the repository at this point in the history
  • Loading branch information
qidian99 committed Nov 19, 2022
1 parent bc60582 commit 87e5e04
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 1,002 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bitsail-component-formats</artifactId>
<groupId>com.bytedance.bitsail</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>bitsail-component-format-pb</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

</project>
1 change: 1 addition & 0 deletions bitsail-components/bitsail-component-formats/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
<packaging>pom</packaging>
<modules>
<module>bitsail-component-format-json</module>
<module>bitsail-component-format-pb</module>
</modules>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public static KafkaDeserializationSchema<Row> getDeserializationSchema(BitSailCo

if (StringUtils.equalsIgnoreCase(PB_DESERIALIZATION_SCHEMA_KEY, formatType)) {
try {
return new PbKafkaDeserializationSchema(configuration);
return new CountKafkaDeserializationSchemaWrapper<>(configuration, new PbDeserializationSchema(configuration));
} catch (Exception e) {
throw new IllegalArgumentException("Pb parser encountered error during initialization.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,20 @@
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.io.IOException;
import java.util.List;

@Internal
public class PbKafkaDeserializationSchema implements KafkaDeserializationSchema<Row> {
public class PbDeserializationSchema implements DeserializationSchema<Row> {
private static final long serialVersionUID = -2556547991095476394L;
private final PbBytesParser parser;
private final RowTypeInfo rowTypeInfo;
private final int arity;

public PbKafkaDeserializationSchema(BitSailConfiguration jobConf) throws Exception {
public PbDeserializationSchema(BitSailConfiguration jobConf) throws Exception {
this.parser = new PbBytesParser(jobConf);

List<Descriptors.FieldDescriptor> fields = parser.getDescriptor().getFields();
Expand Down Expand Up @@ -77,23 +76,21 @@ private PrimitiveColumnTypeInfo<?> getColumnTypeInfo(Descriptors.FieldDescriptor

@Override
public void open(DeserializationSchema.InitializationContext context) throws Exception {
KafkaDeserializationSchema.super.open(context);
}

@Override
public boolean isEndOfStream(Row row) {
return false;
public Row deserialize(byte[] value) throws IOException {
return this.parser.parse(new Row(arity), value, 0, value.length, null, rowTypeInfo);
}

@Override
public Row deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
byte[] value = record.value();
return this.parser.parse(new Row(arity), value, 0, value.length, null, rowTypeInfo);
public void deserialize(byte[] value, Collector<Row> out) throws IOException {
out.collect(deserialize(value));
}

@Override
public void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<Row> out) throws Exception {
out.collect(deserialize(message));
public boolean isEndOfStream(Row row) {
return false;
}

@Override
Expand Down
Loading

0 comments on commit 87e5e04

Please sign in to comment.