Skip to content

Commit a77846e

Browse files
committed
Added some enhancements to ClickHouseAsyncSinkSerializer
1 parent c17bb06 commit a77846e

File tree

1 file changed

+13
-5
lines changed

1 file changed

+13
-5
lines changed

flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,31 @@
1111

1212
public class ClickHouseAsyncSinkSerializer extends AsyncSinkWriterStateSerializer<ClickHousePayload> {
1313
private static final Logger LOG = LoggerFactory.getLogger(ClickHouseAsyncSinkSerializer.class);
14-
14+
private static final int V1 = 1;
1515
@Override
1616
protected void serializeRequestToStream(ClickHousePayload clickHousePayload, DataOutputStream dataOutputStream) throws IOException {
1717
byte[] bytes = clickHousePayload.getPayload();
18-
dataOutputStream.writeInt(bytes.length);
19-
dataOutputStream.write(bytes);
18+
if (bytes != null) {
19+
dataOutputStream.writeInt(bytes.length);
20+
dataOutputStream.write(bytes);
21+
} else {
22+
dataOutputStream.writeInt(-1);
23+
}
24+
2025
}
2126

2227
private ClickHousePayload deserializeV1(DataInputStream dataInputStream) throws IOException {
2328
int len = dataInputStream.readInt();
29+
if (len == -1) {
30+
return new ClickHousePayload(null);
31+
}
2432
byte[] payload = dataInputStream.readNBytes(len);
2533
return new ClickHousePayload(payload);
2634
}
2735

2836
@Override
2937
protected ClickHousePayload deserializeRequestFromStream(long version, DataInputStream dataInputStream) throws IOException {
30-
if (version == 1) {
38+
if (version == V1) {
3139
return deserializeV1(dataInputStream);
3240
} else {
3341
throw new IOException("Unsupported version: " + version);
@@ -36,6 +44,6 @@ protected ClickHousePayload deserializeRequestFromStream(long version, DataInput
3644

3745
@Override
3846
public int getVersion() {
39-
return 1;
47+
return V1;
4048
}
4149
}

0 commit comments

Comments
 (0)