Skip to content

Commit 4dda9ca

Browse files
committed
Add logging & ignore if payload is null before sending
1 parent 4fc155b commit 4dda9ca

File tree

2 files changed

+7
-4
lines changed

2 files changed

+7
-4
lines changed

flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/convertor/ClickHouseConvertor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public ClickHousePayload apply( InputT o, SinkWriter.Context context) {
6565
byte[] payload = this.pojoConvertor.convert(o);
6666
return new ClickHousePayload(payload);
6767
} catch (Exception e) {
68+
LOG.error("Failed to convert ClickHouse payload", e);
6869
return new ClickHousePayload(null);
6970
}
7071
}

flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,12 @@ protected void submitRequestEntries(List<ClickHousePayload> requestEntries, Resu
9393
try {
9494
CompletableFuture<InsertResponse> response = chClient.insert(tableName, out -> {
9595
for (ClickHousePayload requestEntry : requestEntries) {
96-
byte[] payload = requestEntry.getPayload();
97-
// sum the data that is sent to ClickHouse
98-
this.numBytesSendCounter.inc(payload.length);
99-
out.write(payload);
96+
if (requestEntry.getPayload() != null) {
97+
byte[] payload = requestEntry.getPayload();
98+
// sum the data that is sent to ClickHouse
99+
this.numBytesSendCounter.inc(payload.length);
100+
out.write(payload);
101+
}
100102
}
101103
// send the number that is sent to ClickHouse
102104
this.numRecordsSendCounter.inc(requestEntries.size());

0 commit comments

Comments
 (0)