Skip to content

Commit 332ba5b

Browse files
authored
Pipe: support multiple path patterns under tree model (#16575)
* setup * reset * support multi for meta sync * apply review * add tests for schema sync with patterns * consider "isPrefixOrFullPath" * add more cases for PipePlanTreePatternParseVisitorTest * add more cases for PipeStatementTreePatternParseVisitorTest * add more cases for PipeConfigPhysicalPlanTreePatternParseVisitorTest * apply review * apply review * reset * reset * fix license header * fix tests * remove TODO
1 parent dd4a21c commit 332ba5b

File tree

22 files changed

+1336
-171
lines changed

22 files changed

+1336
-171
lines changed

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBTreePatternFormatIT.java

Lines changed: 322 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public void testPrefixPattern() throws Exception {
104104
}
105105

106106
@Test
107-
public void testIotdbPattern() throws Exception {
107+
public void testIoTDBPattern() throws Exception {
108108
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
109109

110110
final String receiverIp = receiverDataNode.getIp();
@@ -127,8 +127,6 @@ public void testIotdbPattern() throws Exception {
127127
final Map<String, String> connectorAttributes = new HashMap<>();
128128

129129
extractorAttributes.put("extractor.path", "root.**.d1.s*");
130-
// When path is set, pattern should be ignored
131-
extractorAttributes.put("extractor.pattern", "root");
132130
extractorAttributes.put("extractor.inclusion", "data.insert");
133131

134132
connectorAttributes.put("connector", "iotdb-thrift-connector");
@@ -158,7 +156,7 @@ public void testIotdbPattern() throws Exception {
158156
}
159157

160158
@Test
161-
public void testIotdbPatternWithLegacySyntax() throws Exception {
159+
public void testIoTDBPatternWithLegacySyntax() throws Exception {
162160
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
163161

164162
final String receiverIp = receiverDataNode.getIp();
@@ -209,4 +207,324 @@ public void testIotdbPatternWithLegacySyntax() throws Exception {
209207
expectedResSet);
210208
}
211209
}
210+
211+
@Test
212+
public void testMultiplePrefixPatternHistoricalData() throws Exception {
213+
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
214+
215+
final String receiverIp = receiverDataNode.getIp();
216+
final int receiverPort = receiverDataNode.getPort();
217+
218+
try (final SyncConfigNodeIServiceClient client =
219+
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
220+
final Map<String, String> extractorAttributes = new HashMap<>();
221+
final Map<String, String> processorAttributes = new HashMap<>();
222+
final Map<String, String> connectorAttributes = new HashMap<>();
223+
224+
extractorAttributes.put("extractor.pattern", "root.db.d1.s, root.db2.d1.s");
225+
extractorAttributes.put("extractor.inclusion", "data.insert");
226+
227+
connectorAttributes.put("connector", "iotdb-thrift-connector");
228+
connectorAttributes.put("connector.batch.enable", "false");
229+
connectorAttributes.put("connector.ip", receiverIp);
230+
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
231+
232+
TestUtils.executeNonQueries(
233+
senderEnv,
234+
Arrays.asList(
235+
"insert into root.db.d1(time, s, s1) values (1, 1, 1)",
236+
"insert into root.db.d2(time, s) values (2, 2)",
237+
"insert into root.db2.d1(time, s) values (3, 3)"),
238+
null);
239+
awaitUntilFlush(senderEnv);
240+
241+
final TSStatus status =
242+
client.createPipe(
243+
new TCreatePipeReq("p1", connectorAttributes)
244+
.setExtractorAttributes(extractorAttributes)
245+
.setProcessorAttributes(processorAttributes));
246+
247+
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
248+
249+
Assert.assertEquals(
250+
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
251+
252+
final Set<String> expectedResSet = new HashSet<>();
253+
expectedResSet.add("1,null,1.0,1.0,");
254+
expectedResSet.add("3,3.0,null,null,");
255+
TestUtils.assertDataEventuallyOnEnv(
256+
receiverEnv,
257+
"select * from root.db2.**,root.db.**",
258+
"Time,root.db2.d1.s,root.db.d1.s,root.db.d1.s1,",
259+
expectedResSet);
260+
}
261+
}
262+
263+
@Test
264+
public void testMultipleIoTDBPatternHistoricalData() throws Exception {
265+
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
266+
267+
final String receiverIp = receiverDataNode.getIp();
268+
final int receiverPort = receiverDataNode.getPort();
269+
270+
try (final SyncConfigNodeIServiceClient client =
271+
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
272+
final Map<String, String> extractorAttributes = new HashMap<>();
273+
final Map<String, String> processorAttributes = new HashMap<>();
274+
final Map<String, String> connectorAttributes = new HashMap<>();
275+
276+
extractorAttributes.put("extractor.path", "root.db.**, root.db2.d1.*");
277+
extractorAttributes.put("extractor.inclusion", "data.insert");
278+
279+
connectorAttributes.put("connector", "iotdb-thrift-connector");
280+
connectorAttributes.put("connector.batch.enable", "false");
281+
connectorAttributes.put("connector.ip", receiverIp);
282+
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
283+
284+
TestUtils.executeNonQueries(
285+
senderEnv,
286+
Arrays.asList(
287+
"insert into root.db.d1(time, s, s1) values (1, 1, 1)",
288+
"insert into root.db.d2(time, s) values (2, 2)",
289+
"insert into root.db2.d1(time, s, t) values (3, 3, 3)",
290+
"insert into root.db3.d1(time, s) values (4, 4)"),
291+
null);
292+
awaitUntilFlush(senderEnv);
293+
294+
final TSStatus status =
295+
client.createPipe(
296+
new TCreatePipeReq("p1", connectorAttributes)
297+
.setExtractorAttributes(extractorAttributes)
298+
.setProcessorAttributes(processorAttributes));
299+
300+
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
301+
302+
Assert.assertEquals(
303+
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
304+
305+
final Set<String> expectedResSet = new HashSet<>();
306+
expectedResSet.add("1,null,null,1.0,1.0,null,");
307+
expectedResSet.add("2,null,null,null,null,2.0,");
308+
expectedResSet.add("3,3.0,3.0,null,null,null,");
309+
TestUtils.assertDataEventuallyOnEnv(
310+
receiverEnv,
311+
"select * from root.db2.**,root.db.**",
312+
"Time,root.db2.d1.s,root.db2.d1.t,root.db.d1.s,root.db.d1.s1,root.db.d2.s,",
313+
expectedResSet);
314+
}
315+
}
316+
317+
@Test
318+
public void testMultipleHybridPatternHistoricalData() throws Exception {
319+
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
320+
321+
final String receiverIp = receiverDataNode.getIp();
322+
final int receiverPort = receiverDataNode.getPort();
323+
324+
try (final SyncConfigNodeIServiceClient client =
325+
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
326+
final Map<String, String> extractorAttributes = new HashMap<>();
327+
final Map<String, String> processorAttributes = new HashMap<>();
328+
final Map<String, String> connectorAttributes = new HashMap<>();
329+
330+
extractorAttributes.put("extractor.path", "root.db.d1.*");
331+
extractorAttributes.put("extractor.pattern", "root.db2.d1.s");
332+
extractorAttributes.put("extractor.inclusion", "data.insert");
333+
334+
connectorAttributes.put("connector", "iotdb-thrift-connector");
335+
connectorAttributes.put("connector.batch.enable", "false");
336+
connectorAttributes.put("connector.ip", receiverIp);
337+
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
338+
339+
TestUtils.executeNonQueries(
340+
senderEnv,
341+
Arrays.asList(
342+
"insert into root.db.d1(time, s, s1) values (1, 1, 1)",
343+
"insert into root.db2.d1(time, s) values (2, 2)",
344+
"insert into root.db3.d1(time, s) values (3, 3)"),
345+
null);
346+
awaitUntilFlush(senderEnv);
347+
348+
final TSStatus status =
349+
client.createPipe(
350+
new TCreatePipeReq("p1", connectorAttributes)
351+
.setExtractorAttributes(extractorAttributes)
352+
.setProcessorAttributes(processorAttributes));
353+
354+
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
355+
356+
Assert.assertEquals(
357+
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
358+
359+
final Set<String> expectedResSet = new HashSet<>();
360+
expectedResSet.add("1,1.0,1.0,null,");
361+
expectedResSet.add("2,null,null,2.0,");
362+
363+
TestUtils.assertDataEventuallyOnEnv(
364+
receiverEnv,
365+
"select * from root.db.**,root.db2.**",
366+
"Time,root.db.d1.s,root.db.d1.s1,root.db2.d1.s,",
367+
expectedResSet);
368+
}
369+
}
370+
371+
@Test
372+
public void testMultiplePrefixPatternRealtimeData() throws Exception {
373+
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
374+
375+
final String receiverIp = receiverDataNode.getIp();
376+
final int receiverPort = receiverDataNode.getPort();
377+
378+
try (final SyncConfigNodeIServiceClient client =
379+
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
380+
final Map<String, String> extractorAttributes = new HashMap<>();
381+
final Map<String, String> processorAttributes = new HashMap<>();
382+
final Map<String, String> connectorAttributes = new HashMap<>();
383+
384+
extractorAttributes.put("extractor.pattern", "root.db.d1.s, root.db2.d1.s");
385+
extractorAttributes.put("extractor.inclusion", "data.insert");
386+
387+
connectorAttributes.put("connector", "iotdb-thrift-connector");
388+
connectorAttributes.put("connector.batch.enable", "false");
389+
connectorAttributes.put("connector.ip", receiverIp);
390+
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
391+
392+
final TSStatus status =
393+
client.createPipe(
394+
new TCreatePipeReq("p1", connectorAttributes)
395+
.setExtractorAttributes(extractorAttributes)
396+
.setProcessorAttributes(processorAttributes));
397+
398+
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
399+
400+
Assert.assertEquals(
401+
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
402+
403+
TestUtils.executeNonQueries(
404+
senderEnv,
405+
Arrays.asList(
406+
"insert into root.db.d1(time, s, s1) values (1, 1, 1)",
407+
"insert into root.db.d2(time, s) values (2, 2)",
408+
"insert into root.db2.d1(time, s) values (3, 3)"),
409+
null);
410+
awaitUntilFlush(senderEnv);
411+
412+
final Set<String> expectedResSet = new HashSet<>();
413+
expectedResSet.add("1,null,1.0,1.0,");
414+
expectedResSet.add("3,3.0,null,null,");
415+
TestUtils.assertDataEventuallyOnEnv(
416+
receiverEnv,
417+
"select * from root.db2.**,root.db.**",
418+
"Time,root.db2.d1.s,root.db.d1.s,root.db.d1.s1,",
419+
expectedResSet);
420+
}
421+
}
422+
423+
@Test
424+
public void testMultipleIoTDBPatternRealtimeData() throws Exception {
425+
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
426+
427+
final String receiverIp = receiverDataNode.getIp();
428+
final int receiverPort = receiverDataNode.getPort();
429+
430+
try (final SyncConfigNodeIServiceClient client =
431+
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
432+
final Map<String, String> extractorAttributes = new HashMap<>();
433+
final Map<String, String> processorAttributes = new HashMap<>();
434+
final Map<String, String> connectorAttributes = new HashMap<>();
435+
436+
extractorAttributes.put("extractor.path", "root.db.**, root.db2.d1.*");
437+
extractorAttributes.put("extractor.inclusion", "data.insert");
438+
439+
connectorAttributes.put("connector", "iotdb-thrift-connector");
440+
connectorAttributes.put("connector.batch.enable", "false");
441+
connectorAttributes.put("connector.ip", receiverIp);
442+
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
443+
444+
final TSStatus status =
445+
client.createPipe(
446+
new TCreatePipeReq("p1", connectorAttributes)
447+
.setExtractorAttributes(extractorAttributes)
448+
.setProcessorAttributes(processorAttributes));
449+
450+
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
451+
452+
Assert.assertEquals(
453+
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
454+
455+
TestUtils.executeNonQueries(
456+
senderEnv,
457+
Arrays.asList(
458+
"insert into root.db.d1(time, s, s1) values (1, 1, 1)",
459+
"insert into root.db.d2(time, s) values (2, 2)",
460+
"insert into root.db2.d1(time, s, t) values (3, 3, 3)",
461+
"insert into root.db3.d1(time, s) values (4, 4)"),
462+
null);
463+
awaitUntilFlush(senderEnv);
464+
465+
final Set<String> expectedResSet = new HashSet<>();
466+
expectedResSet.add("1,null,null,1.0,1.0,null,");
467+
expectedResSet.add("2,null,null,null,null,2.0,");
468+
expectedResSet.add("3,3.0,3.0,null,null,null,");
469+
TestUtils.assertDataEventuallyOnEnv(
470+
receiverEnv,
471+
"select * from root.db2.**,root.db.**",
472+
"Time,root.db2.d1.s,root.db2.d1.t,root.db.d1.s,root.db.d1.s1,root.db.d2.s,",
473+
expectedResSet);
474+
}
475+
}
476+
477+
@Test
478+
public void testMultipleHybridPatternRealtimeData() throws Exception {
479+
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
480+
481+
final String receiverIp = receiverDataNode.getIp();
482+
final int receiverPort = receiverDataNode.getPort();
483+
484+
try (final SyncConfigNodeIServiceClient client =
485+
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
486+
final Map<String, String> extractorAttributes = new HashMap<>();
487+
final Map<String, String> processorAttributes = new HashMap<>();
488+
final Map<String, String> connectorAttributes = new HashMap<>();
489+
490+
extractorAttributes.put("extractor.path", "root.db.d1.*");
491+
extractorAttributes.put("extractor.pattern", "root.db2.d1.s");
492+
extractorAttributes.put("extractor.inclusion", "data.insert");
493+
494+
connectorAttributes.put("connector", "iotdb-thrift-connector");
495+
connectorAttributes.put("connector.batch.enable", "false");
496+
connectorAttributes.put("connector.ip", receiverIp);
497+
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
498+
499+
final TSStatus status =
500+
client.createPipe(
501+
new TCreatePipeReq("p1", connectorAttributes)
502+
.setExtractorAttributes(extractorAttributes)
503+
.setProcessorAttributes(processorAttributes));
504+
505+
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
506+
507+
Assert.assertEquals(
508+
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
509+
510+
TestUtils.executeNonQueries(
511+
senderEnv,
512+
Arrays.asList(
513+
"insert into root.db.d1(time, s, s1) values (1, 1, 1)",
514+
"insert into root.db2.d1(time, s) values (2, 2)",
515+
"insert into root.db3.d1(time, s) values (3, 3)"),
516+
null);
517+
awaitUntilFlush(senderEnv);
518+
519+
final Set<String> expectedResSet = new HashSet<>();
520+
expectedResSet.add("1,1.0,1.0,null,");
521+
expectedResSet.add("2,null,null,2.0,");
522+
523+
TestUtils.assertDataEventuallyOnEnv(
524+
receiverEnv,
525+
"select * from root.db.**,root.db2.**",
526+
"Time,root.db.d1.s,root.db.d1.s1,root.db2.d1.s,",
527+
expectedResSet);
528+
}
529+
}
212530
}

0 commit comments

Comments
 (0)