Skip to content

Commit c91541f

Browse files
authored
Merge pull request #44 from ClickHouse/adjust-version-1.0.0
Adjust sink version
2 parents 9b58bb4 + bc4e4a3 commit c91541f

File tree

11 files changed

+30
-18
lines changed

11 files changed

+30
-18
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
## 0.1.0
2+
* ClickHouse Sink supports Apache Flink 1.17+

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ Maven
4848
<dependency>
4949
<groupId>com.clickhouse.flink</groupId>
5050
<artifactId>flink-connector-clickhouse-2.0.0</artifactId>
51-
<version>0.0.1</version>
51+
<version>0.1.0</version>
5252
<type>pom</type>
5353
</dependency>
5454
```
@@ -61,7 +61,7 @@ Maven
6161
<dependency>
6262
<groupId>com.clickhouse.flink</groupId>
6363
<artifactId>flink-connector-clickhouse-1.17</artifactId>
64-
<version>0.0.1</version>
64+
<version>0.1.0</version>
6565
<type>pom</type>
6666
</dependency>
6767
```

build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ plugins {
66
id("com.gradleup.shadow") version "9.0.2"
77
}
88

9-
val sinkVersion by extra("0.0.1")
9+
val sinkVersion by extra("0.1.0")
1010
val flinkVersion by extra("1.18.0")
1111
val clickhouseVersion by extra("0.9.1")
1212
val junitVersion by extra("5.8.2")

examples/maven/flink-v1.7/covid/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ under the License.
7878
<dependency>
7979
<groupId>com.clickhouse.flink</groupId>
8080
<artifactId>flink-connector-clickhouse-1.17</artifactId>
81-
<version>0.0.1</version>
81+
<version>0.1.0</version>
8282
<classifier>all</classifier>
8383
</dependency>
8484

examples/maven/flink-v2/covid/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ under the License.
7878
<dependency>
7979
<groupId>com.clickhouse.flink</groupId>
8080
<artifactId>flink-connector-clickhouse-2.0.0</artifactId>
81-
<version>0.0.1</version>
81+
<version>0.1.0</version>
8282
<classifier>all</classifier>
8383
</dependency>
8484

examples/sbt/covid/build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ libraryDependencies ++= Seq(
1414
"org.apache.flink" % "flink-streaming-java" % flinkVersion % "provided",
1515
"org.apache.flink" % "flink-clients" % flinkVersion % "provided",
1616
"org.apache.flink" % "flink-connector-files" % "2.0.0" % "provided",
17-
"org.apache.flink.connector" % "clickhouse" % "0.0.1" classifier "all"
17+
"org.apache.flink.connector" % "clickhouse" % "0.1.0" classifier "all"
1818
)
1919

2020
assembly / assemblyJarName := "covid.jar"

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -310,11 +310,16 @@ void ProductNameTest() throws Exception {
310310
lines.sinkTo(csvSink);
311311
int rows = executeAsyncJob(env, tableName, 10, EXPECTED_ROWS);
312312
Assertions.assertEquals(EXPECTED_ROWS, rows);
313-
ClickHouseServerForTests.executeSql("SYSTEM FLUSH LOGS");
314313
if (ClickHouseServerForTests.isCloud())
315-
Thread.sleep(5000);
314+
ClickHouseServerForTests.executeSql("SYSTEM FLUSH LOGS ON CLUSTER 'default'");
315+
else
316+
ClickHouseServerForTests.executeSql("SYSTEM FLUSH LOGS");
317+
318+
if (ClickHouseServerForTests.isCloud())
319+
Thread.sleep(10000);
316320
// let's wait until data will be available in query log
317-
String productName = ClickHouseServerForTests.extractProductName(ClickHouseServerForTests.getDatabase(), tableName);
321+
String startWith = String.format("Flink-ClickHouse-Sink/%s", ClickHouseSinkVersion.getVersion());
322+
String productName = ClickHouseServerForTests.extractProductName(ClickHouseServerForTests.getDatabase(), tableName, startWith);
318323
String compareString = String.format("Flink-ClickHouse-Sink/%s (fv:flink/%s, lv:scala/2.12)", ClickHouseSinkVersion.getVersion(), flinkVersion);
319324

320325
boolean isContains = productName.contains(compareString);

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,8 @@ public static int countRows(String table) throws ExecutionException, Interrupted
123123
return countResult.get(0).getInteger(1);
124124
}
125125
// http_user_agent
126-
public static String extractProductName(String databaseName, String tableName) {
127-
String extractProductName = String.format("SELECT http_user_agent, tables FROM clusterAllReplicas('default', system.query_log) WHERE type = 'QueryStart' AND query_kind = 'Insert' AND has(databases,'%s') AND has(tables,'%s.%s') LIMIT 100", databaseName, databaseName, tableName);
126+
public static String extractProductName(String databaseName, String tableName, String startWith) {
127+
String extractProductName = String.format("SELECT http_user_agent, tables FROM clusterAllReplicas('default', system.query_log) WHERE type = 'QueryStart' AND query_kind = 'Insert' AND has(databases,'%s') AND has(tables,'%s.%s') and startsWith(http_user_agent, '%s') LIMIT 100", databaseName, databaseName, tableName, startWith);
128128
Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password);
129129
List<GenericRecord> userAgentResult = client.queryAll(extractProductName);
130130
if (!userAgentResult.isEmpty()) {

flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -304,11 +304,16 @@ void ProductNameTest() throws Exception {
304304
lines.sinkTo(csvSink);
305305
int rows = executeAsyncJob(env, tableName, 10, EXPECTED_ROWS);
306306
Assertions.assertEquals(EXPECTED_ROWS, rows);
307-
ClickHouseServerForTests.executeSql("SYSTEM FLUSH LOGS");
308307
if (ClickHouseServerForTests.isCloud())
309-
Thread.sleep(5000);
308+
ClickHouseServerForTests.executeSql("SYSTEM FLUSH LOGS ON CLUSTER 'default'");
309+
else
310+
ClickHouseServerForTests.executeSql("SYSTEM FLUSH LOGS");
311+
312+
if (ClickHouseServerForTests.isCloud())
313+
Thread.sleep(10000);
310314
// let's wait until data will be available in query log
311-
String productName = ClickHouseServerForTests.extractProductName(ClickHouseServerForTests.getDatabase(), tableName, "Flink-ClickHouse-Sink");
315+
String startWith = String.format("Flink-ClickHouse-Sink/%s", ClickHouseSinkVersion.getVersion());
316+
String productName = ClickHouseServerForTests.extractProductName(ClickHouseServerForTests.getDatabase(), tableName, startWith);
312317
String compareString = String.format("Flink-ClickHouse-Sink/%s (fv:flink/2.0.0, lv:scala/2.12)", ClickHouseSinkVersion.getVersion());
313318
boolean isContains = productName.contains(compareString);
314319
Assertions.assertTrue(isContains, "Expected user agent to contain: " + compareString + " but got: " + productName);

flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,15 +124,15 @@ public static int countRows(String table) throws ExecutionException, Interrupted
124124
}
125125

126126
// http_user_agent
127-
public static String extractProductName(String databaseName, String tableName, String productNameStartWith) {
128-
String extractProductName = String.format("SELECT http_user_agent, tables FROM clusterAllReplicas('default', system.query_log) WHERE type = 'QueryStart' AND query_kind = 'Insert' AND has(databases,'%s') AND has(tables,'%s.%s') LIMIT 100", databaseName, databaseName, tableName);
127+
public static String extractProductName(String databaseName, String tableName, String startWith) {
128+
String extractProductName = String.format("SELECT http_user_agent, tables FROM clusterAllReplicas('default', system.query_log) WHERE type = 'QueryStart' AND query_kind = 'Insert' AND has(databases,'%s') AND has(tables,'%s.%s') and startsWith(http_user_agent, '%s') LIMIT 100", databaseName, databaseName, tableName, startWith);
129129
Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password);
130130
List<GenericRecord> userAgentResult = client.queryAll(extractProductName);
131131
String userAgentValue = null;
132132
if (!userAgentResult.isEmpty()) {
133133
for (GenericRecord userAgent : userAgentResult) {
134134
userAgentValue = userAgent.getString(1);
135-
if (userAgentValue.contains(productNameStartWith))
135+
if (userAgentValue.contains(startWith))
136136
return userAgent.getString(1);
137137
}
138138
throw new RuntimeException("Can not extract product name from " + userAgentValue);

0 commit comments

Comments
 (0)