Skip to content

Commit b9a7553

Browse files
committed
Adding support for UUID
1 parent fe9347e commit b9a7553

File tree

5 files changed

+68
-83
lines changed

5 files changed

+68
-83
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ Planned for a future release — a complete end-to-end example will be added onc
164164
| long/Long | Time64 || N/A |
165165
| byte/Byte | Enum8 || Serialize.writeInt8 |
166166
| int/Integer | Enum16 || Serialize.writeInt16 |
167+
| java.util.UUID | UUID || Serialize.writeIntUUID |
167168
| String | JSON || N/A |
168169
| Array<Type> | Array<Type> || N/A |
169170
| Map<K,V> | Map<K,V> || N/A |

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java

Lines changed: 52 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,56 @@ public class ClickHouseSinkTests extends FlinkClusterTests {
4646

4747
static final int STREAM_PARALLELISM = 5;
4848

49+
private String createSimplePOJOTableSQL(String database, String tableName, int parts_to_throw_insert) {
50+
String createTable = createSimplePOJOTableSQL(database, tableName);
51+
return createTable.trim().substring(0, createTable.trim().length() - 1) + " " + String.format("SETTINGS parts_to_throw_insert = %d;", parts_to_throw_insert);
52+
}
53+
54+
private String createSimplePOJOTableSQL(String database, String tableName) {
55+
return "CREATE TABLE `" + database + "`.`" + tableName + "` (" +
56+
"bytePrimitive Int8," +
57+
"byteObject Int8," +
58+
"shortPrimitive Int16," +
59+
"shortObject Int16," +
60+
"intPrimitive Int32," +
61+
"integerObject Int32," +
62+
"longPrimitive Int64," +
63+
"longObject Int64," +
64+
"bigInteger128 Int128," +
65+
"bigInteger256 Int256," +
66+
"uint8Primitive UInt8," +
67+
"uint8Object UInt8," +
68+
"uint16Primitive UInt16," +
69+
"uint16Object UInt16," +
70+
"uint32Primitive UInt32," +
71+
"uint32Object UInt32," +
72+
"uint64Primitive UInt64," +
73+
"uint64Object UInt64," +
74+
"uint128Object UInt128," +
75+
"uint256Object UInt256," +
76+
"decimal Decimal(10,5)," +
77+
"decimal32 Decimal32(9)," +
78+
"decimal64 Decimal64(18)," +
79+
"decimal128 Decimal128(38)," +
80+
"decimal256 Decimal256(76)," +
81+
"floatPrimitive Float," +
82+
"floatObject Float," +
83+
"doublePrimitive Double," +
84+
"doubleObject Double," +
85+
"booleanPrimitive Boolean," +
86+
"booleanObject Boolean," +
87+
"str String," +
88+
"fixedStr FixedString(10)," +
89+
"v_date Date," +
90+
"v_date32 Date32," +
91+
"v_dateTime DateTime," +
92+
"v_dateTime64 DateTime64," +
93+
"uuid UUID," +
94+
") " +
95+
"ENGINE = MergeTree " +
96+
"ORDER BY (longPrimitive); ";
97+
}
98+
4999
private int executeAsyncJob(StreamExecutionEnvironment env, String tableName, int numIterations, int expectedRows) throws Exception {
50100
JobClient jobClient = env.executeAsync("Read GZipped CSV with FileSource");
51101
int rows = 0;
@@ -197,47 +247,7 @@ void SimplePOJODataTest() throws Exception {
197247
String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName);
198248
ClickHouseServerForTests.executeSql(dropTable);
199249
// create table
200-
String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" +
201-
"bytePrimitive Int8," +
202-
"byteObject Int8," +
203-
"shortPrimitive Int16," +
204-
"shortObject Int16," +
205-
"intPrimitive Int32," +
206-
"integerObject Int32," +
207-
"longPrimitive Int64," +
208-
"longObject Int64," +
209-
"bigInteger128 Int128," +
210-
"bigInteger256 Int256," +
211-
"uint8Primitive UInt8," +
212-
"uint8Object UInt8," +
213-
"uint16Primitive UInt16," +
214-
"uint16Object UInt16," +
215-
"uint32Primitive UInt32," +
216-
"uint32Object UInt32," +
217-
"uint64Primitive UInt64," +
218-
"uint64Object UInt64," +
219-
"uint128Object UInt128," +
220-
"uint256Object UInt256," +
221-
"decimal Decimal(10,5)," +
222-
"decimal32 Decimal32(9)," +
223-
"decimal64 Decimal64(18)," +
224-
"decimal128 Decimal128(38)," +
225-
"decimal256 Decimal256(76)," +
226-
"floatPrimitive Float," +
227-
"floatObject Float," +
228-
"doublePrimitive Double," +
229-
"doubleObject Double," +
230-
"booleanPrimitive Boolean," +
231-
"booleanObject Boolean," +
232-
"str String," +
233-
"fixedStr FixedString(10)," +
234-
"v_date Date," +
235-
"v_date32 Date32," +
236-
"v_dateTime DateTime," +
237-
"v_dateTime64 DateTime64," +
238-
") " +
239-
"ENGINE = MergeTree " +
240-
"ORDER BY (longPrimitive); ";
250+
String tableSql = createSimplePOJOTableSQL(getDatabase(), tableName);
241251
ClickHouseServerForTests.executeSql(tableSql);
242252

243253

@@ -485,48 +495,7 @@ void SimplePOJODataTooManyPartsTest() throws Exception {
485495
String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName);
486496
ClickHouseServerForTests.executeSql(dropTable);
487497
// create table
488-
String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" +
489-
"bytePrimitive Int8," +
490-
"byteObject Int8," +
491-
"shortPrimitive Int16," +
492-
"shortObject Int16," +
493-
"intPrimitive Int32," +
494-
"integerObject Int32," +
495-
"longPrimitive Int64," +
496-
"longObject Int64," +
497-
"bigInteger128 Int128," +
498-
"bigInteger256 Int256," +
499-
"uint8Primitive UInt8," +
500-
"uint8Object UInt8," +
501-
"uint16Primitive UInt16," +
502-
"uint16Object UInt16," +
503-
"uint32Primitive UInt32," +
504-
"uint32Object UInt32," +
505-
"uint64Primitive UInt64," +
506-
"uint64Object UInt64," +
507-
"uint128Object UInt128," +
508-
"uint256Object UInt256," +
509-
"decimal Decimal(10,5)," +
510-
"decimal32 Decimal32(9)," +
511-
"decimal64 Decimal64(18)," +
512-
"decimal128 Decimal128(38)," +
513-
"decimal256 Decimal256(76)," +
514-
"floatPrimitive Float," +
515-
"floatObject Float," +
516-
"doublePrimitive Double," +
517-
"doubleObject Double," +
518-
"booleanPrimitive Boolean," +
519-
"booleanObject Boolean," +
520-
"str String," +
521-
"fixedStr FixedString(10)," +
522-
"v_date Date," +
523-
"v_date32 Date32," +
524-
"v_dateTime DateTime," +
525-
"v_dateTime64 DateTime64," +
526-
") " +
527-
"ENGINE = MergeTree " +
528-
"ORDER BY (longPrimitive) " +
529-
"SETTINGS parts_to_throw_insert = 10;";
498+
String tableSql = createSimplePOJOTableSQL(getDatabase(), tableName, 10);
530499
ClickHouseServerForTests.executeSql(tableSql);
531500
//ClickHouseServerForTests.executeSql(String.format("SYSTEM STOP MERGES `%s.%s`", getDatabase(), tableName));
532501

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/convertor/SimplePOJOConvertor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,5 +65,6 @@ public void instrument(OutputStream out, SimplePOJO input) throws IOException {
6565
Serialize.writeTimeDate(out, input.getDateTime(), false, false, ClickHouseDataType.DateTime, false, "v_dateTime");
6666
Serialize.writeTimeDate64(out, input.getDateTime64(), false, false, ClickHouseDataType.DateTime64, false, "v_dateTime64", 1);
6767

68+
Serialize.writeUUID(out, input.getUuid(), false, false, ClickHouseDataType.UUID, false, "uuid");
6869
}
6970
}

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/pojo/SimplePOJO.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import java.math.BigInteger;
55
import java.time.LocalDate;
66
import java.time.LocalDateTime;
7+
import java.util.UUID;
78

89
public class SimplePOJO {
910

@@ -61,6 +62,8 @@ public class SimplePOJO {
6162
private LocalDateTime dateTime;
6263
private LocalDateTime dateTime64;
6364

65+
private UUID uuid;
66+
6467
public SimplePOJO(int index) {
6568
this.bytePrimitive = Byte.MIN_VALUE;
6669
this.byteObject = Byte.MAX_VALUE;
@@ -115,6 +118,7 @@ public SimplePOJO(int index) {
115118
this.dateTime = LocalDateTime.now();
116119
this.dateTime64 = LocalDateTime.now();
117120

121+
this.uuid = UUID.randomUUID();
118122
}
119123

120124
public byte getBytePrimitive() {
@@ -215,4 +219,6 @@ public Double getDoubleObject() {
215219

216220
public LocalDateTime getDateTime64() { return dateTime64; }
217221

222+
public UUID getUuid() { return uuid; }
223+
218224
}

flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Serialize.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.time.ZonedDateTime;
1919
import java.util.HashMap;
2020
import java.util.Map;
21+
import java.util.UUID;
2122

2223
public class Serialize {
2324
private static final Logger LOG = LoggerFactory.getLogger(Serialize.class);
@@ -286,4 +287,11 @@ public static void writeBoolean(OutputStream out, Boolean value, boolean default
286287
}
287288
}
288289

290+
// UUID
291+
public static void writeUUID(OutputStream out, UUID value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
292+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
293+
BinaryStreamUtils.writeUuid(out, value);
294+
}
295+
}
296+
289297
}

0 commit comments

Comments
 (0)