Skip to content

Commit c5159a2

Browse files
authored
[Improve][Connector-V2][MongoDB] Support to convert to double from any numeric type (apache#6997)
1 parent a9e0f67 commit c5159a2

File tree

5 files changed

+187
-60
lines changed

5 files changed

+187
-60
lines changed

release-note.md

+1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
- [Connector-v2] [File] Fix WriteStrategy parallel writing thread unsafe issue #5546
5656
- [Connector-v2] [File] Inject FileSystem to OrcWriteStrategy
5757
- [Connector-v2] [File] Support assign encoding for file source/sink (#5973)
58+
- [Connector-v2] [Mongodb] Support to convert to double from numeric type that mongodb saved it as numeric internally (#6997)
5859

5960
### Zeta(ST-Engine)
6061

seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConverters.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ private static boolean convertToBoolean(BsonValue bsonValue) {
353353
}
354354

355355
private static double convertToDouble(BsonValue bsonValue) {
356-
if (bsonValue.isDouble()) {
356+
if (bsonValue.isNumber()) {
357357
return bsonValue.asNumber().doubleValue();
358358
}
359359
throw new MongodbConnectorException(

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java

+81-59
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import com.mongodb.client.MongoCollection;
3737
import com.mongodb.client.MongoCursor;
3838
import com.mongodb.client.model.Sorts;
39+
import com.mongodb.client.result.InsertManyResult;
3940
import lombok.extern.slf4j.Slf4j;
4041

4142
import java.time.Duration;
@@ -60,6 +61,9 @@ public abstract class AbstractMongodbIT extends TestSuiteBase implements TestRes
6061

6162
protected static final List<Document> TEST_NULL_DATASET = generateTestDataSetWithNull(10);
6263

64+
protected static final List<Document> TEST_DOUBLE_DATASET =
65+
generateTestDataSetWithPresets(5, Arrays.asList(44.0d, 44.1d, 44.2d, 44.3d, 44.4d));
66+
6367
protected static final String MONGODB_IMAGE = "mongo:latest";
6468

6569
protected static final String MONGODB_CONTAINER_HOST = "e2e_mongodb";
@@ -76,6 +80,10 @@ public abstract class AbstractMongodbIT extends TestSuiteBase implements TestRes
7680

7781
protected static final String MONGODB_NULL_TABLE_RESULT = "test_null_op_db_result";
7882

83+
protected static final String MONGODB_DOUBLE_TABLE = "test_double_op_db";
84+
85+
protected static final String MONGODB_DOUBLE_TABLE_RESULT = "test_double_op_db_result";
86+
7987
protected static final String MONGODB_MATCH_RESULT_TABLE = "test_match_op_result_db";
8088

8189
protected static final String MONGODB_SPLIT_RESULT_TABLE = "test_split_op_result_db";
@@ -105,20 +113,10 @@ public void initConnection() {
105113
}
106114

107115
protected void initSourceData() {
108-
MongoCollection<Document> sourceMatchTable =
109-
client.getDatabase(MONGODB_DATABASE).getCollection(MONGODB_MATCH_TABLE);
110-
sourceMatchTable.deleteMany(new Document());
111-
sourceMatchTable.insertMany(TEST_MATCH_DATASET);
112-
113-
MongoCollection<Document> sourceSplitTable =
114-
client.getDatabase(MONGODB_DATABASE).getCollection(MONGODB_SPLIT_TABLE);
115-
sourceSplitTable.deleteMany(new Document());
116-
sourceSplitTable.insertMany(TEST_SPLIT_DATASET);
117-
118-
MongoCollection<Document> sourceNullTable =
119-
client.getDatabase(MONGODB_DATABASE).getCollection(MONGODB_NULL_TABLE);
120-
sourceNullTable.deleteMany(new Document());
121-
sourceNullTable.insertMany(TEST_NULL_DATASET);
116+
prepareInitDataInCollection(MONGODB_MATCH_TABLE, TEST_MATCH_DATASET);
117+
prepareInitDataInCollection(MONGODB_SPLIT_TABLE, TEST_SPLIT_DATASET);
118+
prepareInitDataInCollection(MONGODB_NULL_TABLE, TEST_NULL_DATASET);
119+
prepareInitDataInCollection(MONGODB_DOUBLE_TABLE, TEST_DOUBLE_DATASET);
122120
}
123121

124122
protected void clearDate(String table) {
@@ -129,51 +127,7 @@ public static List<Document> generateTestDataSet(int count) {
129127
List<Document> dataSet = new ArrayList<>();
130128

131129
for (int i = 0; i < count; i++) {
132-
dataSet.add(
133-
new Document(
134-
"c_map",
135-
new Document("OQBqH", randomString())
136-
.append("rkvlO", randomString())
137-
.append("pCMEX", randomString())
138-
.append("DAgdj", randomString())
139-
.append("dsJag", randomString()))
140-
.append(
141-
"c_array",
142-
Arrays.asList(
143-
RANDOM.nextInt(),
144-
RANDOM.nextInt(),
145-
RANDOM.nextInt(),
146-
RANDOM.nextInt(),
147-
RANDOM.nextInt()))
148-
.append("c_string", randomString())
149-
.append("c_boolean", RANDOM.nextBoolean())
150-
.append("c_int", i)
151-
.append("c_bigint", RANDOM.nextLong())
152-
.append("c_double", RANDOM.nextDouble() * Double.MAX_VALUE)
153-
.append(
154-
"c_row",
155-
new Document(
156-
"c_map",
157-
new Document("OQBqH", randomString())
158-
.append("rkvlO", randomString())
159-
.append("pCMEX", randomString())
160-
.append("DAgdj", randomString())
161-
.append("dsJag", randomString()))
162-
.append(
163-
"c_array",
164-
Arrays.asList(
165-
RANDOM.nextInt(),
166-
RANDOM.nextInt(),
167-
RANDOM.nextInt(),
168-
RANDOM.nextInt(),
169-
RANDOM.nextInt()))
170-
.append("c_string", randomString())
171-
.append("c_boolean", RANDOM.nextBoolean())
172-
.append("c_int", RANDOM.nextInt())
173-
.append("c_bigint", RANDOM.nextLong())
174-
.append(
175-
"c_double",
176-
RANDOM.nextDouble() * Double.MAX_VALUE)));
130+
dataSet.add(generateData(i, RANDOM.nextDouble() * Double.MAX_VALUE));
177131
}
178132
return dataSet;
179133
}
@@ -195,6 +149,17 @@ public static List<Document> generateTestDataSetWithNull(int count) {
195149
return dataSet;
196150
}
197151

152+
public static List<Document> generateTestDataSetWithPresets(
153+
int count, List<Double> doublePresets) {
154+
List<Document> dataSet = new ArrayList<>(count);
155+
156+
for (int i = 0; i < count; i++) {
157+
dataSet.add(generateData(i, doublePresets.get(i)));
158+
}
159+
160+
return dataSet;
161+
}
162+
198163
protected static String randomString() {
199164
int length = RANDOM.nextInt(10) + 1;
200165
StringBuilder sb = new StringBuilder(length);
@@ -205,6 +170,63 @@ protected static String randomString() {
205170
return sb.toString();
206171
}
207172

173+
private static Document generateData(int intPreset, Double doublePreset) {
174+
return new Document(
175+
"c_map",
176+
new Document("OQBqH", randomString())
177+
.append("rkvlO", randomString())
178+
.append("pCMEX", randomString())
179+
.append("DAgdj", randomString())
180+
.append("dsJag", randomString()))
181+
.append(
182+
"c_array",
183+
Arrays.asList(
184+
RANDOM.nextInt(),
185+
RANDOM.nextInt(),
186+
RANDOM.nextInt(),
187+
RANDOM.nextInt(),
188+
RANDOM.nextInt()))
189+
.append("c_string", randomString())
190+
.append("c_boolean", RANDOM.nextBoolean())
191+
.append("c_int", intPreset)
192+
.append("c_bigint", RANDOM.nextLong())
193+
.append("c_double", doublePreset)
194+
.append(
195+
"c_row",
196+
new Document(
197+
"c_map",
198+
new Document("OQBqH", randomString())
199+
.append("rkvlO", randomString())
200+
.append("pCMEX", randomString())
201+
.append("DAgdj", randomString())
202+
.append("dsJag", randomString()))
203+
.append(
204+
"c_array",
205+
Arrays.asList(
206+
RANDOM.nextInt(),
207+
RANDOM.nextInt(),
208+
RANDOM.nextInt(),
209+
RANDOM.nextInt(),
210+
RANDOM.nextInt()))
211+
.append("c_string", randomString())
212+
.append("c_boolean", RANDOM.nextBoolean())
213+
.append("c_int", RANDOM.nextInt())
214+
.append("c_bigint", RANDOM.nextLong())
215+
.append("c_double", RANDOM.nextDouble() * Double.MAX_VALUE));
216+
}
217+
218+
private void prepareInitDataInCollection(String collection, List<Document> dataSet) {
219+
MongoCollection<Document> source =
220+
client.getDatabase(MONGODB_DATABASE).getCollection(collection);
221+
source.deleteMany(new Document());
222+
223+
InsertManyResult result = source.insertMany(dataSet);
224+
225+
if (result.getInsertedIds().size() != dataSet.size()) {
226+
throw new IllegalStateException("Insertion count mismatch");
227+
}
228+
}
229+
208230
protected List<Document> readMongodbData(String collection) {
209231
MongoCollection<Document> sinkTable =
210232
client.getDatabase(MONGODB_DATABASE).getCollection(collection);

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java

+16
Original file line numberDiff line numberDiff line change
@@ -221,4 +221,20 @@ public void testTransactionSinkAndUpsert(TestContainer container)
221221
clearDate(MONGODB_TRANSACTION_SINK_TABLE);
222222
clearDate(MONGODB_TRANSACTION_UPSERT_TABLE);
223223
}
224+
225+
@TestTemplate
226+
public void testMongodbDoubleValue(TestContainer container)
227+
throws IOException, InterruptedException {
228+
Container.ExecResult assertSinkResult = container.executeJob("/mongodb_double_value.conf");
229+
Assertions.assertEquals(0, assertSinkResult.getExitCode(), assertSinkResult.getStderr());
230+
231+
Assertions.assertIterableEquals(
232+
TEST_DOUBLE_DATASET.stream()
233+
.peek(e -> e.remove("_id"))
234+
.collect(Collectors.toList()),
235+
readMongodbData(MONGODB_DOUBLE_TABLE_RESULT).stream()
236+
.peek(e -> e.remove("_id"))
237+
.collect(Collectors.toList()));
238+
clearDate(MONGODB_DOUBLE_TABLE_RESULT);
239+
}
224240
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env {
19+
parallelism = 1
20+
job.mode = "BATCH"
21+
#spark config
22+
spark.app.name = "SeaTunnel"
23+
spark.executor.instances = 1
24+
spark.executor.cores = 1
25+
spark.executor.memory = "1g"
26+
spark.master = local
27+
}
28+
29+
source {
30+
MongoDB {
31+
uri = "mongodb://e2e_mongodb:27017/test_db"
32+
database = "test_db"
33+
collection = "test_double_op_db"
34+
result_table_name = "mongodb_table"
35+
cursor.no-timeout = true
36+
fetch.size = 1000
37+
max.time-min = 100
38+
schema = {
39+
fields {
40+
c_map = "map<string, string>"
41+
c_array = "array<int>"
42+
c_string = string
43+
c_boolean = boolean
44+
c_int = int
45+
c_bigint = bigint
46+
c_double = double
47+
c_row = {
48+
c_map = "map<string, string>"
49+
c_array = "array<int>"
50+
c_string = string
51+
c_boolean = boolean
52+
c_int = int
53+
c_bigint = bigint
54+
c_double = double
55+
}
56+
}
57+
}
58+
}
59+
}
60+
61+
sink {
62+
MongoDB {
63+
uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true"
64+
database = "test_db"
65+
collection = "test_double_op_db_result"
66+
source_table_name = "mongodb_table"
67+
schema = {
68+
fields {
69+
c_map = "map<string, string>"
70+
c_array = "array<int>"
71+
c_string = string
72+
c_boolean = boolean
73+
c_int = int
74+
c_bigint = bigint
75+
c_double = double
76+
c_row = {
77+
c_map = "map<string, string>"
78+
c_array = "array<int>"
79+
c_string = string
80+
c_boolean = boolean
81+
c_int = int
82+
c_bigint = bigint
83+
c_double = double
84+
}
85+
}
86+
}
87+
}
88+
}

0 commit comments

Comments
 (0)