Skip to content

Commit 97a0fe4

Browse files
committed
Adding POJO support
1 parent c160c68 commit 97a0fe4

File tree

14 files changed

+644
-27
lines changed

14 files changed

+644
-27
lines changed

flink-connector-clickhouse-base/build.gradle.kts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,16 @@ val scalaVersion = "2.13.12"
1212

1313
repositories {
1414
// Use Maven Central for resolving dependencies.
15+
mavenLocal()
1516
mavenCentral()
1617
}
1718

1819
extra.apply {
19-
set("clickHouseDriverVersion", "0.8.5")
20+
set("clickHouseDriverVersion", "0.8.6-SNAPSHOT")
2021
set("flinkVersion", "2.0.0")
2122
set("log4jVersion","2.17.2")
2223
set("testContainersVersion", "1.21.0")
24+
set("byteBuddyVersion", "1.17.5")
2325
}
2426

2527
dependencies {
@@ -28,6 +30,8 @@ dependencies {
2830

2931
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
3032

33+
implementation("net.bytebuddy:byte-buddy:${project.extra["byteBuddyVersion"]}")
34+
implementation("net.bytebuddy:byte-buddy-agent:${project.extra["byteBuddyVersion"]}")
3135
// This dependency is used by the application.
3236
implementation(libs.guava)
3337
implementation("org.scala-lang:scala-library:$scalaVersion")
@@ -39,10 +43,7 @@ dependencies {
3943
implementation("org.apache.logging.log4j:log4j-core:${project.extra["log4jVersion"]}")
4044

4145
// ClickHouse Client Libraries
42-
implementation("com.clickhouse:clickhouse-client:${project.extra["clickHouseDriverVersion"]}")
43-
implementation("com.clickhouse:clickhouse-http-client:${project.extra["clickHouseDriverVersion"]}")
44-
implementation("com.clickhouse:clickhouse-data:${project.extra["clickHouseDriverVersion"]}")
45-
implementation("com.clickhouse:client-v2:${project.extra["clickHouseDriverVersion"]}")
46+
implementation("com.clickhouse:client-v2:${project.extra["clickHouseDriverVersion"]}:all")
4647
// Apache Flink Libraries
4748
implementation("org.apache.flink:flink-connector-base:${project.extra["flinkVersion"]}")
4849
implementation("org.apache.flink:flink-streaming-java:${project.extra["flinkVersion"]}")
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
package com.clickhouse.utils;
2+
3+
import com.clickhouse.client.api.data_formats.internal.SerializerUtils;
4+
import com.clickhouse.data.ClickHouseDataType;
5+
import com.clickhouse.data.format.BinaryStreamUtils;
6+
7+
import java.io.IOException;
8+
import java.io.OutputStream;
9+
import java.lang.reflect.Method;
10+
import java.time.LocalDate;
11+
import java.time.ZoneId;
12+
import java.time.ZonedDateTime;
13+
import java.util.HashMap;
14+
import java.util.Map;
15+
16+
//import static com.clickhouse.client.api.data_formats.internal.SerializerUtils.writeDate;
17+
18+
public class Serialize {
19+
20+
public static boolean writePrimitiveValuePreamble(OutputStream out, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
21+
// since it is primitive we always have a value that is not null
22+
if (defaultsSupport) {
23+
SerializerUtils.writeNonNull(out);
24+
if (isNullable) {
25+
SerializerUtils.writeNonNull(out);
26+
}
27+
} else if (isNullable) {
28+
SerializerUtils.writeNonNull(out);
29+
}
30+
return true;
31+
}
32+
public static boolean writeValuePreamble(OutputStream out, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column, Object value) throws IOException {
33+
if (defaultsSupport) {
34+
if (value != null) {
35+
SerializerUtils.writeNonNull(out);
36+
if (isNullable) {
37+
SerializerUtils.writeNonNull(out);
38+
}
39+
} else {
40+
if (hasDefault) {
41+
SerializerUtils.writeNull(out);
42+
return false;
43+
}
44+
45+
if (isNullable) {
46+
SerializerUtils.writeNonNull(out);
47+
SerializerUtils.writeNull(out);
48+
return false;
49+
}
50+
51+
if (dataType == ClickHouseDataType.Array) {
52+
SerializerUtils.writeNonNull(out);
53+
} else if (dataType != ClickHouseDataType.Dynamic) {
54+
throw new IllegalArgumentException(String.format("An attempt to write null into not nullable column '%s'", column));
55+
}
56+
}
57+
} else if (isNullable) {
58+
if (value == null) {
59+
SerializerUtils.writeNull(out);
60+
return false;
61+
}
62+
63+
SerializerUtils.writeNonNull(out);
64+
} else if (value == null) {
65+
if (dataType == ClickHouseDataType.Array) {
66+
SerializerUtils.writeNonNull(out);
67+
} else if (dataType != ClickHouseDataType.Dynamic) {
68+
throw new IllegalArgumentException(String.format("An attempt to write null into not nullable column '%s'", column));
69+
}
70+
}
71+
return true;
72+
}
73+
74+
public static String convertToString(Object value) {
75+
return java.lang.String.valueOf(value);
76+
}
77+
78+
public static Integer convertToInteger(Object value) {
79+
if (value instanceof Integer) {
80+
return (Integer) value;
81+
} else if (value instanceof Number) {
82+
return ((Number) value).intValue();
83+
} else if (value instanceof String) {
84+
return Integer.parseInt((String) value);
85+
} else if (value instanceof Boolean) {
86+
return ((Boolean) value) ? 1 : 0;
87+
} else {
88+
throw new IllegalArgumentException("Cannot convert " + value + " to Integer");
89+
}
90+
}
91+
92+
public static Map<ClickHouseDataType, Method> mapClickHouseTypeToMethod() {
93+
Map<ClickHouseDataType, Method> map = new HashMap<>();
94+
for (Method method : Serialize.class.getMethods()) {
95+
String name = method.getName();
96+
if (name.startsWith("write")) {
97+
String chType = name.substring("write".length());
98+
try {
99+
ClickHouseDataType type = ClickHouseDataType.valueOf(chType);
100+
map.put(type, method);
101+
} catch (IllegalArgumentException e) {
102+
System.out.println(e.getMessage());
103+
}
104+
}
105+
}
106+
return map;
107+
}
108+
109+
/**
110+
*
111+
*/
112+
113+
// Method structure write[ClickHouse Type](OutputStream, Java type, ... )
114+
// Date support
115+
public static void writeDate(OutputStream out, LocalDate value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
116+
System.out.println("writeDate");
117+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
118+
SerializerUtils.writeDate(out, value, ZoneId.of("UTC")); // TODO: check
119+
}
120+
}
121+
122+
public static void writeDate(OutputStream out, ZonedDateTime value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
123+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
124+
SerializerUtils.writeDate(out, value, ZoneId.of("UTC")); // TODO: check
125+
}
126+
}
127+
128+
// clickhouse type String support
129+
public static void writeString(OutputStream out, String value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
130+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
131+
BinaryStreamUtils.writeString(out, convertToString(value));
132+
}
133+
}
134+
135+
public static void writeFixedString(OutputStream out, String value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, int size, String column) throws IOException {
136+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
137+
BinaryStreamUtils.writeFixedString(out, convertToString(value), size);
138+
}
139+
}
140+
141+
// Int8
142+
public static void writeInt8(OutputStream out, Byte value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
143+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
144+
BinaryStreamUtils.writeInt8(out, convertToInteger(value));
145+
}
146+
}
147+
148+
// Int16
149+
public static void writeInt16(OutputStream out, Short value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
150+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
151+
BinaryStreamUtils.writeInt16(out, convertToInteger(value));
152+
}
153+
}
154+
155+
// Int32
156+
public static void writeInt32(OutputStream out, Integer value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
157+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
158+
BinaryStreamUtils.writeInt32(out, convertToInteger(value));
159+
}
160+
}
161+
162+
// Int64
163+
public static void writeInt64(OutputStream out, Long value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
164+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
165+
BinaryStreamUtils.writeInt64(out, convertToInteger(value));
166+
}
167+
}
168+
169+
// Float32
170+
public static void writeFloat32(OutputStream out, Float value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
171+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
172+
BinaryStreamUtils.writeFloat32(out, value);
173+
}
174+
}
175+
176+
// Float64
177+
public static void writeFloat64(OutputStream out, Double value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
178+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
179+
BinaryStreamUtils.writeFloat64(out, value);
180+
}
181+
}
182+
183+
}

flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/convertor/ClickHouseConvertor.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@
77
import org.slf4j.Logger;
88
import org.slf4j.LoggerFactory;
99

10+
1011
public class ClickHouseConvertor<InputT> implements ElementConverter<InputT, ClickHousePayload> {
1112
private static final Logger LOG = LoggerFactory.getLogger(ClickHouseConvertor.class);
1213

14+
POJOConvertor<InputT> pojoConvertor = null;
1315
enum Types {
1416
STRING,
1517
POJO,
@@ -30,6 +32,15 @@ public ClickHouseConvertor(Class<?> clazz) {
3032
}
3133
}
3234

35+
public ClickHouseConvertor(Class<?> clazz, POJOConvertor<InputT> pojoConvertor) {
36+
if (clazz == null) {
37+
throw new IllegalArgumentException("clazz must not be not null");
38+
} else {
39+
type = Types.POJO;
40+
this.pojoConvertor = pojoConvertor;
41+
}
42+
}
43+
3344
@Override
3445
public ClickHousePayload apply( InputT o, SinkWriter.Context context) {
3546
if (o == null) {
@@ -47,8 +58,13 @@ public ClickHousePayload apply( InputT o, SinkWriter.Context context) {
4758
return new ClickHousePayload((payload + "\n").getBytes());
4859
}
4960
if (type == Types.POJO) {
50-
// TODO Convert to byte stream
51-
return null;
61+
// TODO Convert POJO to bytes
62+
try {
63+
byte[] payload = this.pojoConvertor.convert(o);
64+
return new ClickHousePayload(payload);
65+
} catch (Exception e) {
66+
return new ClickHousePayload(null);
67+
}
5268
}
5369
throw new IllegalArgumentException("unable to convert " + o + " to " + type);
5470
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package org.apache.flink.connector.clickhouse.convertor;
2+
3+
import java.io.ByteArrayOutputStream;
4+
import java.io.IOException;
5+
import java.io.OutputStream;
6+
import java.io.Serializable;
7+
8+
public abstract class POJOConvertor<InputT> implements Serializable {
9+
public abstract void instrument(OutputStream out, InputT input) throws IOException;
10+
11+
public byte[] convert(InputT input) throws IOException {
12+
ByteArrayOutputStream out = new ByteArrayOutputStream();
13+
instrument(out, input);
14+
return out.toByteArray();
15+
}
16+
}

flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/data/ClickHousePayload.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package org.apache.flink.connector.clickhouse.data;
22

3-
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
43
import org.slf4j.Logger;
54
import org.slf4j.LoggerFactory;
65

flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.clickhouse.client.api.Client;
44
import com.clickhouse.client.api.ClientConfigProperties;
5-
import org.apache.flink.connector.clickhouse.data.ClickHousePayload;
65
import org.slf4j.Logger;
76
import org.slf4j.LoggerFactory;
87

@@ -16,25 +15,40 @@ public class ClickHouseClientConfig implements Serializable {
1615
private final String password;
1716
private final String database;
1817
private final String tableName;
18+
// private List<Class<?>> classToReisterList = null;
19+
// private List<TableSchema> tableSchemaList = null;
1920

2021
public ClickHouseClientConfig(String url, String username, String password, String database, String tableName) {
2122
this.url = url;
2223
this.username = username;
2324
this.password = password;
2425
this.database = database;
2526
this.tableName = tableName;
27+
// this.classToReisterList = new ArrayList<>();
28+
// this.tableSchemaList = new ArrayList<>();
2629
}
2730

2831
public Client createClient(String database) {
29-
return new Client.Builder()
32+
Client client = new Client.Builder()
3033
.addEndpoint(url)
3134
.setUsername(username)
3235
.setPassword(password)
3336
.setDefaultDatabase(database)
3437
.setOption(ClientConfigProperties.ASYNC_OPERATIONS.getKey(), "true")
3538
.build();
39+
// if (classToReisterList != null) {
40+
// for (int index = 0; index < classToReisterList.size(); index++) {
41+
// client.register(classToReisterList.get(index), tableSchemaList.get(index));
42+
// }
43+
// }
44+
return client;
3645
}
3746

47+
// public void registerClass(Class<?> clazz, TableSchema tableSchema) {
48+
// classToReisterList.add(clazz);
49+
// tableSchemaList.add(tableSchema);
50+
// }
51+
3852
public Client createClient() {
3953
return createClient(this.database);
4054
}

0 commit comments

Comments
 (0)