diff --git a/CHANGELOG.md b/CHANGELOG.md
index e69de29..0072b54 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -0,0 +1,2 @@
+## 0.1.0
+* ClickHouse Sink supports Apache Flink 1.17+
\ No newline at end of file
diff --git a/README.md b/README.md
index 83d5850..3e88d89 100644
--- a/README.md
+++ b/README.md
@@ -48,7 +48,7 @@ Maven
com.clickhouse.flink
flink-connector-clickhouse-2.0.0
- 0.0.1
+ 0.1.0
pom
```
@@ -61,7 +61,7 @@ Maven
com.clickhouse.flink
flink-connector-clickhouse-1.17
- 0.0.1
+ 0.1.0
pom
```
diff --git a/build.gradle.kts b/build.gradle.kts
index de2f187..d909c0e 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -6,7 +6,7 @@ plugins {
id("com.gradleup.shadow") version "9.0.2"
}
-val sinkVersion by extra("0.0.1")
+val sinkVersion by extra("0.1.0")
val flinkVersion by extra("1.18.0")
val clickhouseVersion by extra("0.9.1")
val junitVersion by extra("5.8.2")
diff --git a/examples/maven/flink-v1.7/covid/pom.xml b/examples/maven/flink-v1.7/covid/pom.xml
index a6bf8b9..9de20b0 100644
--- a/examples/maven/flink-v1.7/covid/pom.xml
+++ b/examples/maven/flink-v1.7/covid/pom.xml
@@ -78,7 +78,7 @@ under the License.
com.clickhouse.flink
flink-connector-clickhouse-1.17
- 0.0.1
+ 0.1.0
all
diff --git a/examples/maven/flink-v2/covid/pom.xml b/examples/maven/flink-v2/covid/pom.xml
index 9128176..c8ee7f2 100644
--- a/examples/maven/flink-v2/covid/pom.xml
+++ b/examples/maven/flink-v2/covid/pom.xml
@@ -78,7 +78,7 @@ under the License.
com.clickhouse.flink
flink-connector-clickhouse-2.0.0
- 0.0.1
+ 0.1.0
all
diff --git a/examples/sbt/covid/build.sbt b/examples/sbt/covid/build.sbt
index 69a19fc..8b83dda 100644
--- a/examples/sbt/covid/build.sbt
+++ b/examples/sbt/covid/build.sbt
@@ -14,7 +14,7 @@ libraryDependencies ++= Seq(
"org.apache.flink" % "flink-streaming-java" % flinkVersion % "provided",
"org.apache.flink" % "flink-clients" % flinkVersion % "provided",
"org.apache.flink" % "flink-connector-files" % "2.0.0" % "provided",
- "org.apache.flink.connector" % "clickhouse" % "0.0.1" classifier "all"
+ "org.apache.flink.connector" % "clickhouse" % "0.1.0" classifier "all"
)
assembly / assemblyJarName := "covid.jar"
diff --git a/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java b/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java
index 7f9df0c..2873279 100644
--- a/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java
+++ b/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java
@@ -310,11 +310,16 @@ void ProductNameTest() throws Exception {
lines.sinkTo(csvSink);
int rows = executeAsyncJob(env, tableName, 10, EXPECTED_ROWS);
Assertions.assertEquals(EXPECTED_ROWS, rows);
- ClickHouseServerForTests.executeSql("SYSTEM FLUSH LOGS");
if (ClickHouseServerForTests.isCloud())
- Thread.sleep(5000);
+ ClickHouseServerForTests.executeSql("SYSTEM FLUSH LOGS ON CLUSTER 'default'");
+ else
+ ClickHouseServerForTests.executeSql("SYSTEM FLUSH LOGS");
+
+ if (ClickHouseServerForTests.isCloud())
+ Thread.sleep(10000);
// let's wait until data will be available in query log
- String productName = ClickHouseServerForTests.extractProductName(ClickHouseServerForTests.getDatabase(), tableName);
+ String startWith = String.format("Flink-ClickHouse-Sink/%s", ClickHouseSinkVersion.getVersion());
+ String productName = ClickHouseServerForTests.extractProductName(ClickHouseServerForTests.getDatabase(), tableName, startWith);
String compareString = String.format("Flink-ClickHouse-Sink/%s (fv:flink/%s, lv:scala/2.12)", ClickHouseSinkVersion.getVersion(), flinkVersion);
boolean isContains = productName.contains(compareString);
diff --git a/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java b/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java
index 2a72dff..cc74c45 100644
--- a/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java
+++ b/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java
@@ -123,8 +123,8 @@ public static int countRows(String table) throws ExecutionException, Interrupted
return countResult.get(0).getInteger(1);
}
// http_user_agent
- public static String extractProductName(String databaseName, String tableName) {
- String extractProductName = String.format("SELECT http_user_agent, tables FROM clusterAllReplicas('default', system.query_log) WHERE type = 'QueryStart' AND query_kind = 'Insert' AND has(databases,'%s') AND has(tables,'%s.%s') LIMIT 100", databaseName, databaseName, tableName);
+ public static String extractProductName(String databaseName, String tableName, String startWith) {
+ String extractProductName = String.format("SELECT http_user_agent, tables FROM clusterAllReplicas('default', system.query_log) WHERE type = 'QueryStart' AND query_kind = 'Insert' AND has(databases,'%s') AND has(tables,'%s.%s') and startsWith(http_user_agent, '%s') LIMIT 100", databaseName, databaseName, tableName, startWith);
Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password);
List userAgentResult = client.queryAll(extractProductName);
if (!userAgentResult.isEmpty()) {
diff --git a/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java b/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java
index c755395..53cc690 100644
--- a/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java
+++ b/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java
@@ -304,11 +304,16 @@ void ProductNameTest() throws Exception {
lines.sinkTo(csvSink);
int rows = executeAsyncJob(env, tableName, 10, EXPECTED_ROWS);
Assertions.assertEquals(EXPECTED_ROWS, rows);
- ClickHouseServerForTests.executeSql("SYSTEM FLUSH LOGS");
if (ClickHouseServerForTests.isCloud())
- Thread.sleep(5000);
+ ClickHouseServerForTests.executeSql("SYSTEM FLUSH LOGS ON CLUSTER 'default'");
+ else
+ ClickHouseServerForTests.executeSql("SYSTEM FLUSH LOGS");
+
+ if (ClickHouseServerForTests.isCloud())
+ Thread.sleep(10000);
// let's wait until data will be available in query log
- String productName = ClickHouseServerForTests.extractProductName(ClickHouseServerForTests.getDatabase(), tableName, "Flink-ClickHouse-Sink");
+ String startWith = String.format("Flink-ClickHouse-Sink/%s", ClickHouseSinkVersion.getVersion());
+ String productName = ClickHouseServerForTests.extractProductName(ClickHouseServerForTests.getDatabase(), tableName, startWith);
String compareString = String.format("Flink-ClickHouse-Sink/%s (fv:flink/2.0.0, lv:scala/2.12)", ClickHouseSinkVersion.getVersion());
boolean isContains = productName.contains(compareString);
Assertions.assertTrue(isContains, "Expected user agent to contain: " + compareString + " but got: " + productName);
diff --git a/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java b/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java
index b47c5db..ed1e935 100644
--- a/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java
+++ b/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java
@@ -124,15 +124,15 @@ public static int countRows(String table) throws ExecutionException, Interrupted
}
// http_user_agent
- public static String extractProductName(String databaseName, String tableName, String productNameStartWith) {
- String extractProductName = String.format("SELECT http_user_agent, tables FROM clusterAllReplicas('default', system.query_log) WHERE type = 'QueryStart' AND query_kind = 'Insert' AND has(databases,'%s') AND has(tables,'%s.%s') LIMIT 100", databaseName, databaseName, tableName);
+ public static String extractProductName(String databaseName, String tableName, String startWith) {
+ String extractProductName = String.format("SELECT http_user_agent, tables FROM clusterAllReplicas('default', system.query_log) WHERE type = 'QueryStart' AND query_kind = 'Insert' AND has(databases,'%s') AND has(tables,'%s.%s') and startsWith(http_user_agent, '%s') LIMIT 100", databaseName, databaseName, tableName, startWith);
Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password);
List userAgentResult = client.queryAll(extractProductName);
String userAgentValue = null;
if (!userAgentResult.isEmpty()) {
for (GenericRecord userAgent : userAgentResult) {
userAgentValue = userAgent.getString(1);
- if (userAgentValue.contains(productNameStartWith))
+ if (userAgentValue.contains(startWith))
return userAgent.getString(1);
}
throw new RuntimeException("Can not extract product name from " + userAgentValue);
diff --git a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkVersion.java b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkVersion.java
index ae8b6ca..7138a02 100644
--- a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkVersion.java
+++ b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkVersion.java
@@ -2,6 +2,6 @@
public class ClickHouseSinkVersion {
public static String getVersion() {
- return "0.0.1";
+ return "0.1.0";
}
}