Skip to content

Commit 9b58bb4

Browse files
authored
Merge pull request #40 from ClickHouse/update-readme
Update readme
2 parents 4076f9d + a233eb0 commit 9b58bb4

File tree

5 files changed

+239
-7
lines changed

5 files changed

+239
-7
lines changed

README.md

Lines changed: 170 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,42 +8,205 @@ Table of Contents
88
* [Supported Flink Versions](#supported-flink-versions)
99
* [Installation](#installation)
1010
* [DataStream API](#dataStream-api)
11-
* [Snippets](#snippets)
12-
* [Examples](#examples)
11+
* [Snippets](#snippet)
12+
* [Examples](#example)
1313
* [Table API](#table-api)
14-
* [Artifacts](#artifacts-1)
15-
* [Examples](#examples-1)
14+
* [Snippets](#snippet-1)
15+
* [Examples](#example-1)
16+
* [Supported ClickHouse Types](#supported-clickHouse-types)
17+
* [Configuration Options](#configuration-options)
18+
* [Client Configuration](#client-configuration)
19+
* [Sink Configuration](#sink-configuration)
20+
* [Limitations](#limitations)
1621
* [Contributing](#contributing)
1722

1823
## About The Project
1924

2025
This is a repo of ClickHouse official Apache Flink Connector supported by the ClickHouse team.
21-
The Connector supports to main Apache Flink API's
26+
The connector supports two main Apache Flink APIs:
2227
- DataStreamAPI
23-
- Table API
28+
- Table API (This feature is not implemented yet and is planned for a future release)
2429

2530
## Supported Flink Versions
2631

32+
| Version | Dependency | ClickHouse Client Version | Required Java |
33+
|---------|----------------------------------|---------------------------|---------------|
34+
| latest | flink-connector-clickhouse-2.0.0 | 0.9.1 | Java 17+ |
35+
| 2.0.0 | flink-connector-clickhouse-2.0.0 | 0.9.1 | Java 17+ |
36+
| 1.20.2 | flink-connector-clickhouse-1.17 | 0.9.1 | Java 11+ |
37+
| 1.19.3 | flink-connector-clickhouse-1.17 | 0.9.1 | Java 11+ |
38+
| 1.18.1 | flink-connector-clickhouse-1.17 | 0.9.1 | Java 11+ |
39+
| 1.17.2 | flink-connector-clickhouse-1.17 | 0.9.1 | Java 11+ |
40+
2741
## Installation
2842

43+
### For Flink 2.0.0+
44+
45+
Maven
46+
47+
```xml
48+
<dependency>
49+
<groupId>com.clickhouse.flink</groupId>
50+
<artifactId>flink-connector-clickhouse-2.0.0</artifactId>
51+
<version>0.0.1</version>
52+
<type>pom</type>
53+
</dependency>
54+
```
55+
56+
### For Flink 1.17+
57+
58+
Maven
59+
60+
```xml
61+
<dependency>
62+
<groupId>com.clickhouse.flink</groupId>
63+
<artifactId>flink-connector-clickhouse-1.17</artifactId>
64+
<version>0.0.1</version>
65+
<type>pom</type>
66+
</dependency>
67+
```
68+
2969
## DataStream API
3070

3171
### Snippet
3272

73+
Configure ClickHouseClient
74+
75+
```java
76+
ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(url, username, password, database, tableName);
77+
```
78+
If you are planning to insert RAW CSV data as is
79+
80+
Create an ElementConverter
81+
82+
```java
83+
ElementConverter<String, ClickHousePayload> convertorString = new ClickHouseConvertor<>(String.class);
84+
```
85+
86+
Create the sink and set the format using `setClickHouseFormat`
87+
88+
```java
89+
ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
90+
convertorString,
91+
MAX_BATCH_SIZE,
92+
MAX_IN_FLIGHT_REQUESTS,
93+
MAX_BUFFERED_REQUESTS,
94+
MAX_BATCH_SIZE_IN_BYTES,
95+
MAX_TIME_IN_BUFFER_MS,
96+
MAX_RECORD_SIZE_IN_BYTES,
97+
clickHouseClientConfig
98+
);
99+
100+
csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
101+
```
102+
103+
Finally, connect your DataStream to the sink.
104+
105+
```java
106+
data.sinkTo(csvSink);
107+
```
108+
109+
More examples and snippets can be found in our tests [flink-connector-clickhouse-1.17](flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink) and [flink-connector-clickhouse-2.0.0](flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink)
110+
33111
### Example
34112

113+
We have created maven based example for easy start with ClickHouse Sink
114+
Different versions for Flink
115+
116+
- [Flink 1.17+](examples/maven/flink-v1.7/covid)
117+
- [Flink 2.0.0+](examples/maven/flink-v2/covid)
118+
119+
For more detailed instructions, see the [Example Guide](examples#readme)
35120

36121
## Table API
37122

123+
Table API is planned for a future release. This section will be updated once available.
124+
38125
### Snippet
39126

127+
Planned for a future release — this section will provide a usage snippet for configuring the Table API.
128+
40129
### Example
41130

131+
Planned for a future release — a complete end-to-end example will be added once the Table API becomes available.
132+
133+
## Supported ClickHouse Types
134+
135+
| Java Type | ClickHouse Type | Supported | Serialize Method |
136+
|-----------------|-----------------|-----------|-------------------------|
137+
| byte/Byte | Int8 || Serialize.writeInt8 |
138+
| short/Short | Int16 || Serialize.writeInt16 |
139+
| int/Integer | Int32 || Serialize.writeInt32 |
140+
| long/Long | Int64 || Serialize.writeInt64 |
141+
| BigInteger | Int128 || Serialize.writeInt124 |
142+
| BigInteger | Int256 || Serialize.writeInt256 |
143+
| byte/Byte | UInt8 || N/A |
144+
| short/Short | UInt16 || N/A |
145+
| int/Integer | UInt32 || N/A |
146+
| long/Long | UInt64 || N/A |
147+
| BigInteger | UInt128 || N/A |
148+
| BigInteger | UInt256 || N/A |
149+
| BigDecimal | Decimal || N/A |
150+
| BigDecimal | Decimal32 || N/A |
151+
| BigDecimal | Decimal64 || N/A |
152+
| BigDecimal | Decimal128 || N/A |
153+
| BigDecimal | Decimal256 || N/A |
154+
| float/Float | Float || Serialize.writeFloat32 |
155+
| double/Double | Double || Serialize.writeFloat64 |
156+
| boolean/Boolean | Boolean || Serialize.writeBoolean |
157+
| String | String || Serialize.writeString |
158+
| String | FixedString || N/A |
159+
| LocalDate | Date || N/A |
160+
| LocalDate | Date32 || N/A |
161+
| LocalDateTime | DateTime || N/A |
162+
| LocalDateTime | DateTime64 || N/A |
163+
| int/Integer | Time || N/A |
164+
| long/Long | Time64 || N/A |
165+
| byte/Byte | Enum8 || Serialize.writeInt8 |
166+
| int/Integer | Enum16 || Serialize.writeInt16 |
167+
| String | JSON || N/A |
168+
| Array<Type> | Array<Type> || N/A |
169+
| Map<K,V> | Map<K,V> || N/A |
170+
| Tuple<Type,..> | Map<T1,T2,..> || N/A |
171+
| Object | Variant || N/A |
172+
173+
* For date operation need to provide ZoneId.
174+
175+
## Configuration Options
176+
177+
### Client configuration
178+
179+
| Parameters | Description | Default Value |
180+
|---------------|------------------------------|----------|
181+
| url | fully qualified URL | N/A |
182+
| username | ClickHouse database username | N/A |
183+
| password | ClickHouse database password | N/A |
184+
| database | ClickHouse database name | N/A |
185+
| table | ClickHouse table name | N/A |
186+
187+
### Sink configuration
188+
189+
Our Sink is built on top of Flink’s `AsyncSinkBase`
190+
191+
| Parameters | Description | Default Value |
192+
|---------------|---------------------------------------------------------------------------------------|----------|
193+
| maxBatchSize | Maximum number of records inserted in a single batch | N/A |
194+
| maxInFlightRequests | The maximum number of in flight requests allowed before the sink applies backpressure | N/A |
195+
| maxBufferedRequests | The maximum number of records that may be buffered in the sink before backpressure is applied | N/A |
196+
| maxBatchSizeInBytes | The maximum size (in bytes) a batch may become. All batches sent will be smaller than or equal to this size | N/A |
197+
| maxTimeInBufferMS | The maximum time a record may stay in the sink before being flushed | N/A |
198+
| maxRecordSizeInBytes | The maximum record size that the sink will accept, records larger than this will be automatically rejected | N/A |
199+
200+
## Limitations
201+
202+
* Currently the sink does not support exactly-once semantics
203+
204+
42205
## Compatibility
43206

44207
- All projects in this repo are tested with all [active LTS versions](https://github.com/ClickHouse/ClickHouse/pulls?q=is%3Aopen+is%3Apr+label%3Arelease) of ClickHouse.
45208
- [Support policy](https://github.com/ClickHouse/ClickHouse/blob/master/SECURITY.md#security-change-log-and-support)
46-
- We recommend to upgrade connector continuously to not miss security fixes and new improvements
209+
- We recommend upgrading the connector continuously to not miss security fixes and new improvements
47210
- If you have an issue with migration - create and issue and we will respond!
48211

49212
## Contributing

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,10 +206,15 @@ void SimplePOJODataTest() throws Exception {
206206
"integerObject Int32," +
207207
"longPrimitive Int64," +
208208
"longObject Int64," +
209+
"bigInteger128 Int128," +
210+
"bigInteger256 Int256," +
209211
"floatPrimitive Float," +
210212
"floatObject Float," +
211213
"doublePrimitive Double," +
212214
"doubleObject Double," +
215+
"booleanPrimitive Boolean," +
216+
"booleanObject Boolean," +
217+
"str String," +
213218
") " +
214219
"ENGINE = MergeTree " +
215220
"ORDER BY (longPrimitive); ";
@@ -464,10 +469,15 @@ void SimplePOJODataTooManyPartsTest() throws Exception {
464469
"integerObject Int32," +
465470
"longPrimitive Int64," +
466471
"longObject Int64," +
472+
"bigInteger128 Int128," +
473+
"bigInteger256 Int256," +
467474
"floatPrimitive Float," +
468475
"floatObject Float," +
469476
"doublePrimitive Double," +
470477
"doubleObject Double," +
478+
"booleanPrimitive Boolean," +
479+
"booleanObject Boolean," +
480+
"str String," +
471481
") " +
472482
"ENGINE = MergeTree " +
473483
"ORDER BY (longPrimitive) " +

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/convertor/SimplePOJOConvertor.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,19 @@ public void instrument(OutputStream out, SimplePOJO input) throws IOException {
2323
Serialize.writeInt64(out, input.getLongPrimitive(), false, false, ClickHouseDataType.Int64, false, "longPrimitive");
2424
Serialize.writeInt64(out, input.getLongObject(), false, false, ClickHouseDataType.Int64, false, "longObject");
2525

26+
Serialize.writeInt128(out, input.getBigInteger128(), false, false, ClickHouseDataType.Int128, false, "bigInteger128");
27+
Serialize.writeInt256(out, input.getBigInteger256(), false, false, ClickHouseDataType.Int256, false, "bigInteger256");
28+
2629
Serialize.writeFloat32(out, input.getFloatPrimitive(), false, false, ClickHouseDataType.Float32, false, "floatPrimitive");
2730
Serialize.writeFloat32(out, input.getFloatObject(), false, false, ClickHouseDataType.Float32, false, "floatObject");
2831

2932
Serialize.writeFloat64(out, input.getDoublePrimitive(), false, false, ClickHouseDataType.Float64, false, "doublePrimitive");
3033
Serialize.writeFloat64(out, input.getDoubleObject(), false, false, ClickHouseDataType.Float64, false, "doubleObject");
34+
35+
Serialize.writeBoolean(out, input.isBooleanPrimitive(), false, false, ClickHouseDataType.Bool, false, "booleanPrimitive");
36+
Serialize.writeBoolean(out, input.getBooleanObject(), false, false, ClickHouseDataType.Bool, false, "booleanObject");
37+
38+
Serialize.writeString(out, input.getStr(), false, false, ClickHouseDataType.String, false, "String");
39+
3140
}
3241
}

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/pojo/SimplePOJO.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package org.apache.flink.connector.clickhouse.sink.pojo;
22

3+
import java.math.BigInteger;
4+
35
public class SimplePOJO {
46

57
private byte bytePrimitive;
@@ -20,6 +22,14 @@ public class SimplePOJO {
2022
private double doublePrimitive;
2123
private Double doubleObject;
2224

25+
private boolean booleanPrimitive;
26+
private Boolean booleanObject;
27+
28+
private String str;
29+
30+
private BigInteger bigInteger128;
31+
private BigInteger bigInteger256;
32+
2333
public SimplePOJO(int index) {
2434
this.bytePrimitive = Byte.MIN_VALUE;
2535
this.byteObject = Byte.MAX_VALUE;
@@ -38,6 +48,14 @@ public SimplePOJO(int index) {
3848

3949
this.doublePrimitive = Double.MIN_VALUE;
4050
this.doubleObject = Double.MAX_VALUE;
51+
52+
this.booleanPrimitive = true;
53+
this.booleanObject = Boolean.FALSE;
54+
55+
this.str = "str" + longPrimitive;
56+
57+
this.bigInteger128 = BigInteger.valueOf(longPrimitive);
58+
this.bigInteger256 = BigInteger.valueOf(longPrimitive);
4159
}
4260

4361
public byte getBytePrimitive() {
@@ -87,4 +105,14 @@ public double getDoublePrimitive() {
87105
public Double getDoubleObject() {
88106
return doubleObject;
89107
}
108+
109+
public boolean isBooleanPrimitive() { return booleanPrimitive; }
110+
111+
public Boolean getBooleanObject() { return booleanObject; }
112+
113+
public String getStr() { return str; }
114+
115+
public BigInteger getBigInteger128() { return bigInteger128; }
116+
117+
public BigInteger getBigInteger256() { return bigInteger256; }
90118
}

flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Serialize.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.io.IOException;
1111
import java.io.OutputStream;
1212
import java.lang.reflect.Method;
13+
import java.math.BigInteger;
1314
import java.time.LocalDate;
1415
import java.time.ZoneId;
1516
import java.time.ZonedDateTime;
@@ -168,6 +169,20 @@ public static void writeInt64(OutputStream out, Long value, boolean defaultsSupp
168169
}
169170
}
170171

172+
// Int128
173+
public static void writeInt128(OutputStream out, BigInteger value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
174+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
175+
BinaryStreamUtils.writeInt128(out, SerializerUtils.convertToBigInteger(value));
176+
}
177+
}
178+
179+
// Int256
180+
public static void writeInt256(OutputStream out, BigInteger value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
181+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
182+
BinaryStreamUtils.writeInt256(out, SerializerUtils.convertToBigInteger(value));
183+
}
184+
}
185+
171186
// Float32
172187
public static void writeFloat32(OutputStream out, Float value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
173188
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
@@ -182,4 +197,11 @@ public static void writeFloat64(OutputStream out, Double value, boolean defaults
182197
}
183198
}
184199

200+
// Boolean
201+
public static void writeBoolean(OutputStream out, Boolean value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
202+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
203+
BinaryStreamUtils.writeBoolean(out, value);
204+
}
205+
}
206+
185207
}

0 commit comments

Comments
 (0)