Skip to content

Commit 11faf34

Browse files
committed
Added a few metrics & started to implement retry logic
1 parent e0f61b3 commit 11faf34

File tree

6 files changed

+171
-25
lines changed

6 files changed

+171
-25
lines changed
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package com.clickhouse.utils;
2+
3+
import com.clickhouse.client.ClickHouseException;
4+
import org.apache.flink.connector.clickhouse.exception.RetriableException;
5+
import org.apache.flink.connector.clickhouse.sink.ClickHouseAsyncWriter;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import java.io.IOException;
10+
import java.net.SocketTimeoutException;
11+
import java.net.UnknownHostException;
12+
import java.util.Collection;
13+
14+
public class Utils {
15+
16+
private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
17+
private static final String CLICKHOUSE_CLIENT_ERROR_READ_TIMEOUT_MSG = "Read timed out after";
18+
private static final String CLICKHOUSE_CLIENT_ERROR_WRITE_TIMEOUT_MSG = "Write timed out after";
19+
20+
/**
21+
* This will drill down to the first ClickHouseException in the exception chain
22+
*
23+
* @param e Exception to drill down
24+
* @return ClickHouseException or null if none found
25+
*/
26+
public static Exception getRootCause(Throwable e, Boolean prioritizeClickHouseException) {
27+
if (e == null)
28+
return null;
29+
30+
Throwable runningException = e;//We have to use Throwable because of the getCause() signature
31+
while (runningException.getCause() != null &&
32+
(!prioritizeClickHouseException || !(runningException instanceof ClickHouseException))) {
33+
LOG.trace("Found exception: {}", runningException.getLocalizedMessage());
34+
runningException = runningException.getCause();
35+
}
36+
37+
return runningException instanceof Exception ? (Exception) runningException : null;
38+
}
39+
40+
/**
41+
* This method checks to see if we should retry, otherwise it just throws the exception again
42+
*
43+
* @param e Exception to check
44+
*/
45+
46+
public static void handleException(Throwable e) {
47+
LOG.warn("Deciding how to handle exception: {}", e.getLocalizedMessage());
48+
49+
//Let's check if we have a ClickHouseException to reference the error code
50+
//https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/ErrorCodes.cpp
51+
Exception rootCause = Utils.getRootCause(e, true);
52+
if (rootCause instanceof ClickHouseException) {
53+
ClickHouseException clickHouseException = (ClickHouseException) rootCause;
54+
LOG.warn("ClickHouseException code: {}", clickHouseException.getErrorCode());
55+
switch (clickHouseException.getErrorCode()) {
56+
case 3: // UNEXPECTED_END_OF_FILE
57+
case 107: // FILE_DOESNT_EXIST
58+
case 159: // TIMEOUT_EXCEEDED
59+
case 164: // READONLY
60+
case 202: // TOO_MANY_SIMULTANEOUS_QUERIES
61+
case 203: // NO_FREE_CONNECTION
62+
case 209: // SOCKET_TIMEOUT
63+
case 210: // NETWORK_ERROR
64+
case 241: // MEMORY_LIMIT_EXCEEDED
65+
case 242: // TABLE_IS_READ_ONLY
66+
case 252: // TOO_MANY_PARTS
67+
case 285: // TOO_FEW_LIVE_REPLICAS
68+
case 319: // UNKNOWN_STATUS_OF_INSERT
69+
case 425: // SYSTEM_ERROR
70+
case 999: // KEEPER_EXCEPTION
71+
throw new RetriableException(e);
72+
default:
73+
LOG.error("Error code [{}] wasn't in the acceptable list.", clickHouseException.getErrorCode());
74+
break;
75+
}
76+
}
77+
78+
//Otherwise use Root-Cause Exception Checking
79+
if (rootCause instanceof SocketTimeoutException) {
80+
LOG.warn("SocketTimeoutException thrown, wrapping exception: {}", e.getLocalizedMessage());
81+
throw new RetriableException(e);
82+
} else if (rootCause instanceof UnknownHostException) {
83+
LOG.warn("UnknownHostException thrown, wrapping exception: {}", e.getLocalizedMessage());
84+
throw new RetriableException(e);
85+
} else if (rootCause instanceof IOException) {
86+
final String msg = rootCause.getMessage();
87+
if (msg.indexOf(CLICKHOUSE_CLIENT_ERROR_READ_TIMEOUT_MSG) == 0 || msg.indexOf(CLICKHOUSE_CLIENT_ERROR_WRITE_TIMEOUT_MSG) == 0) {
88+
LOG.warn("IOException thrown, wrapping exception: {}", e.getLocalizedMessage());
89+
throw new RetriableException(e);
90+
}
91+
}
92+
}
93+
94+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package org.apache.flink.connector.clickhouse.exception;
2+
3+
public class FlinkWriteException extends RuntimeException {
4+
private static final long serialVersionUID = 1L;
5+
6+
public FlinkWriteException(Throwable cause) {
7+
8+
}
9+
public FlinkWriteException(String message, Throwable cause) {
10+
super(message, cause);
11+
}
12+
public FlinkWriteException(String message) {
13+
super(message);
14+
}
15+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package org.apache.flink.connector.clickhouse.exception;
2+
3+
public class RetriableException extends FlinkWriteException {
4+
public RetriableException(String message) {
5+
super(message);
6+
}
7+
public RetriableException(String message, Throwable cause) {
8+
super(message, cause);
9+
}
10+
public RetriableException(Throwable cause) {
11+
super(cause);
12+
}
13+
}

flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,17 @@
55
import com.clickhouse.client.api.insert.InsertResponse;
66
import com.clickhouse.client.api.insert.InsertSettings;
77
import com.clickhouse.data.ClickHouseFormat;
8+
import com.clickhouse.utils.Utils;
89
import org.apache.flink.api.connector.sink2.WriterInitContext;
910
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
1011
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
1112
import org.apache.flink.connector.base.sink.writer.ElementConverter;
1213
import org.apache.flink.connector.base.sink.writer.ResultHandler;
1314
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
1415
import org.apache.flink.connector.clickhouse.data.ClickHousePayload;
16+
import org.apache.flink.connector.clickhouse.exception.RetriableException;
17+
import org.apache.flink.metrics.Counter;
18+
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
1519
import org.slf4j.Logger;
1620
import org.slf4j.LoggerFactory;
1721

@@ -26,6 +30,10 @@ public class ClickHouseAsyncWriter<InputT> extends AsyncSinkWriter<InputT, Click
2630
private final ClickHouseClientConfig clickHouseClientConfig;
2731
private ClickHouseFormat clickHouseFormat = null;
2832

33+
private final Counter numBytesSendCounter;
34+
private final Counter numRecordsSendCounter;
35+
private final Counter numRequestSubmittedCounter;
36+
2937
public ClickHouseAsyncWriter(ElementConverter<InputT, ClickHousePayload> elementConverter,
3038
WriterInitContext context,
3139
int maxBatchSize,
@@ -50,6 +58,10 @@ public ClickHouseAsyncWriter(ElementConverter<InputT, ClickHousePayload> element
5058
state);
5159
this.clickHouseClientConfig = clickHouseClientConfig;
5260
this.clickHouseFormat = clickHouseFormat;
61+
final SinkWriterMetricGroup metricGroup = context.metricGroup();
62+
this.numBytesSendCounter = metricGroup.getNumBytesSendCounter();
63+
this.numRecordsSendCounter = metricGroup.getNumRecordsSendCounter();
64+
this.numRequestSubmittedCounter = metricGroup.counter("numRequestSubmitted");
5365
}
5466

5567
@Override
@@ -59,25 +71,36 @@ protected long getSizeInBytes(ClickHousePayload clickHousePayload) {
5971

6072
@Override
6173
protected void submitRequestEntries(List<ClickHousePayload> requestEntries, ResultHandler<ClickHousePayload> resultHandler) {
74+
this.numRequestSubmittedCounter.inc();
6275
LOG.info("Submitting request entries...");
63-
AtomicInteger totalSizeSend = new AtomicInteger();
6476
Client chClient = this.clickHouseClientConfig.createClient();
6577
String tableName = clickHouseClientConfig.getTableName();
6678
// TODO: get from constructor or ClickHousePayload need to think what is the best way
6779
ClickHouseFormat format = null;
68-
if (clickHouseFormat == null) {
69-
// this not define lets try to get it from ClickHousePayload in case of POJO can be RowBinary or RowBinaryWithDefaults
70-
} else {
80+
if (clickHouseFormat != null) {
7181
format = clickHouseFormat;
82+
} else {
83+
// TODO: check if we configured payload to POJO serialization.
84+
// this not define lets try to get it from ClickHousePayload in case of POJO can be RowBinary or RowBinaryWithDefaults
85+
Boolean supportDefault = clickHouseClientConfig.getSupportDefault();
86+
if (supportDefault != null) {
87+
if (supportDefault) format = ClickHouseFormat.RowBinaryWithDefaults;
88+
else format = ClickHouseFormat.RowBinary;
89+
} else {
90+
throw new RuntimeException("ClickHouseFormat was not set ");
91+
}
7292
}
7393
try {
7494
CompletableFuture<InsertResponse> response = chClient.insert(tableName, out -> {
7595
for (ClickHousePayload requestEntry : requestEntries) {
7696
byte[] payload = requestEntry.getPayload();
77-
totalSizeSend.addAndGet(payload.length);
97+
// sum the data that is sent to ClickHouse
98+
this.numBytesSendCounter.inc(payload.length);
7899
out.write(payload);
79100
}
80-
LOG.info("Data that will be send to ClickHouse in bytes {} and the amount of records {}.", totalSizeSend.get(), requestEntries.size());
101+
// send the number that is sent to ClickHouse
102+
this.numRecordsSendCounter.inc(requestEntries.size());
103+
LOG.info("Data that will be send to ClickHouse in bytes {} and the amount of records {}.", numBytesSendCounter.getCount(), requestEntries.size());
81104
out.close();
82105
}, format, new InsertSettings().setOption(ClientConfigProperties.ASYNC_OPERATIONS.getKey(), "true"));
83106
response.whenComplete((insertResponse, throwable) -> {
@@ -108,9 +131,16 @@ private void handleFailedRequest(
108131
List<ClickHousePayload> requestEntries,
109132
ResultHandler<ClickHousePayload> resultHandler,
110133
Throwable error) {
111-
// TODO extract from error if we can retry
112-
error.printStackTrace();
113-
134+
// TODO: extract from error if we can retry
135+
try {
136+
Utils.handleException(error);
137+
} catch (RetriableException e) {
138+
LOG.info("Retriable exception occurred while processing request. ", e);
139+
// TODO: send data again
140+
resultHandler.retryForEntries(requestEntries);
141+
}
142+
LOG.info("completeExceptionally");
143+
resultHandler.completeExceptionally((Exception)error);
114144
}
115145

116146
}

flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,14 @@ public class ClickHouseClientConfig implements Serializable {
1515
private final String password;
1616
private final String database;
1717
private final String tableName;
18-
// private List<Class<?>> classToReisterList = null;
19-
// private List<TableSchema> tableSchemaList = null;
18+
private Boolean supportDefault = null;
2019

2120
public ClickHouseClientConfig(String url, String username, String password, String database, String tableName) {
2221
this.url = url;
2322
this.username = username;
2423
this.password = password;
2524
this.database = database;
2625
this.tableName = tableName;
27-
// this.classToReisterList = new ArrayList<>();
28-
// this.tableSchemaList = new ArrayList<>();
2926
}
3027

3128
public Client createClient(String database) {
@@ -36,24 +33,20 @@ public Client createClient(String database) {
3633
.setDefaultDatabase(database)
3734
.setOption(ClientConfigProperties.ASYNC_OPERATIONS.getKey(), "true")
3835
.build();
39-
// if (classToReisterList != null) {
40-
// for (int index = 0; index < classToReisterList.size(); index++) {
41-
// client.register(classToReisterList.get(index), tableSchemaList.get(index));
42-
// }
43-
// }
4436
return client;
4537
}
4638

47-
// public void registerClass(Class<?> clazz, TableSchema tableSchema) {
48-
// classToReisterList.add(clazz);
49-
// tableSchemaList.add(tableSchema);
50-
// }
51-
5239
public Client createClient() {
5340
return createClient(this.database);
5441
}
5542

5643
public String getTableName() { return tableName; }
5744

45+
public void setSupportDefault(Boolean supportDefault) {
46+
this.supportDefault = supportDefault;
47+
}
5848

49+
public Boolean getSupportDefault() {
50+
return supportDefault;
51+
}
5952
}

flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ void CovidPOJODataTest() throws Exception {
137137
env.setParallelism(5);
138138

139139
ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(getServerURL(), getUsername(), getPassword(), getDatabase(), tableName);
140+
clickHouseClientConfig.setSupportDefault(covidTableSchema.hasDefaults());
140141
ElementConverter<CovidPOJO, ClickHousePayload> convertorCovid = new ClickHouseConvertor<>(CovidPOJO.class, covidPOJOConvertor);
141142

142143
ClickHouseAsyncSink<CovidPOJO> covidPOJOSink = new ClickHouseAsyncSink<>(
@@ -150,8 +151,6 @@ void CovidPOJODataTest() throws Exception {
150151
clickHouseClientConfig
151152
);
152153

153-
covidPOJOSink.setClickHouseFormat(ClickHouseFormat.RowBinary);
154-
155154
Path filePath = new Path("./src/test/resources/epidemiology_top_10000.csv.gz");
156155

157156
FileSource<String> source = FileSource
@@ -212,6 +211,8 @@ void SimplePOJODataTest() throws Exception {
212211
env.setParallelism(5);
213212

214213
ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(getServerURL(), getUsername(), getPassword(), getDatabase(), tableName);
214+
clickHouseClientConfig.setSupportDefault(simpleTableSchema.hasDefaults());
215+
215216
ElementConverter<SimplePOJO, ClickHousePayload> convertorCovid = new ClickHouseConvertor<>(SimplePOJO.class, simplePOJOConvertor);
216217

217218
ClickHouseAsyncSink<SimplePOJO> simplePOJOSink = new ClickHouseAsyncSink<>(

0 commit comments

Comments
 (0)