Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void testPrefixPattern() throws Exception {
}

@Test
public void testIotdbPattern() throws Exception {
public void testIoTDBPattern() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
Expand All @@ -127,8 +127,6 @@ public void testIotdbPattern() throws Exception {
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.path", "root.**.d1.s*");
// When path is set, pattern should be ignored
extractorAttributes.put("extractor.pattern", "root");
extractorAttributes.put("extractor.inclusion", "data.insert");

connectorAttributes.put("connector", "iotdb-thrift-connector");
Expand Down Expand Up @@ -158,7 +156,7 @@ public void testIotdbPattern() throws Exception {
}

@Test
public void testIotdbPatternWithLegacySyntax() throws Exception {
public void testIoTDBPatternWithLegacySyntax() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
Expand Down Expand Up @@ -209,4 +207,324 @@ public void testIotdbPatternWithLegacySyntax() throws Exception {
expectedResSet);
}
}

@Test
public void testMultiplePrefixPatternHistoricalData() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.pattern", "root.db.d1.s, root.db2.d1.s");
extractorAttributes.put("extractor.inclusion", "data.insert");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
"insert into root.db.d1(time, s, s1) values (1, 1, 1)",
"insert into root.db.d2(time, s) values (2, 2)",
"insert into root.db2.d1(time, s) values (3, 3)"),
null);
awaitUntilFlush(senderEnv);

final TSStatus status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());

final Set<String> expectedResSet = new HashSet<>();
expectedResSet.add("1,null,1.0,1.0,");
expectedResSet.add("3,3.0,null,null,");
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select * from root.db2.**,root.db.**",
"Time,root.db2.d1.s,root.db.d1.s,root.db.d1.s1,",
expectedResSet);
}
}

@Test
public void testMultipleIoTDBPatternHistoricalData() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.path", "root.db.**, root.db2.d1.*");
extractorAttributes.put("extractor.inclusion", "data.insert");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
"insert into root.db.d1(time, s, s1) values (1, 1, 1)",
"insert into root.db.d2(time, s) values (2, 2)",
"insert into root.db2.d1(time, s, t) values (3, 3, 3)",
"insert into root.db3.d1(time, s) values (4, 4)"),
null);
awaitUntilFlush(senderEnv);

final TSStatus status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());

final Set<String> expectedResSet = new HashSet<>();
expectedResSet.add("1,null,null,1.0,1.0,null,");
expectedResSet.add("2,null,null,null,null,2.0,");
expectedResSet.add("3,3.0,3.0,null,null,null,");
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select * from root.db2.**,root.db.**",
"Time,root.db2.d1.s,root.db2.d1.t,root.db.d1.s,root.db.d1.s1,root.db.d2.s,",
expectedResSet);
}
}

@Test
public void testMultipleHybridPatternHistoricalData() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.path", "root.db.d1.*");
extractorAttributes.put("extractor.pattern", "root.db2.d1.s");
extractorAttributes.put("extractor.inclusion", "data.insert");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
"insert into root.db.d1(time, s, s1) values (1, 1, 1)",
"insert into root.db2.d1(time, s) values (2, 2)",
"insert into root.db3.d1(time, s) values (3, 3)"),
null);
awaitUntilFlush(senderEnv);

final TSStatus status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());

final Set<String> expectedResSet = new HashSet<>();
expectedResSet.add("1,1.0,1.0,null,");
expectedResSet.add("2,null,null,2.0,");

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select * from root.db.**,root.db2.**",
"Time,root.db.d1.s,root.db.d1.s1,root.db2.d1.s,",
expectedResSet);
}
}

@Test
public void testMultiplePrefixPatternRealtimeData() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.pattern", "root.db.d1.s, root.db2.d1.s");
extractorAttributes.put("extractor.inclusion", "data.insert");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

final TSStatus status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());

TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
"insert into root.db.d1(time, s, s1) values (1, 1, 1)",
"insert into root.db.d2(time, s) values (2, 2)",
"insert into root.db2.d1(time, s) values (3, 3)"),
null);
awaitUntilFlush(senderEnv);

final Set<String> expectedResSet = new HashSet<>();
expectedResSet.add("1,null,1.0,1.0,");
expectedResSet.add("3,3.0,null,null,");
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select * from root.db2.**,root.db.**",
"Time,root.db2.d1.s,root.db.d1.s,root.db.d1.s1,",
expectedResSet);
}
}

@Test
public void testMultipleIoTDBPatternRealtimeData() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.path", "root.db.**, root.db2.d1.*");
extractorAttributes.put("extractor.inclusion", "data.insert");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

final TSStatus status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());

TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
"insert into root.db.d1(time, s, s1) values (1, 1, 1)",
"insert into root.db.d2(time, s) values (2, 2)",
"insert into root.db2.d1(time, s, t) values (3, 3, 3)",
"insert into root.db3.d1(time, s) values (4, 4)"),
null);
awaitUntilFlush(senderEnv);

final Set<String> expectedResSet = new HashSet<>();
expectedResSet.add("1,null,null,1.0,1.0,null,");
expectedResSet.add("2,null,null,null,null,2.0,");
expectedResSet.add("3,3.0,3.0,null,null,null,");
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select * from root.db2.**,root.db.**",
"Time,root.db2.d1.s,root.db2.d1.t,root.db.d1.s,root.db.d1.s1,root.db.d2.s,",
expectedResSet);
}
}

@Test
public void testMultipleHybridPatternRealtimeData() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.path", "root.db.d1.*");
extractorAttributes.put("extractor.pattern", "root.db2.d1.s");
extractorAttributes.put("extractor.inclusion", "data.insert");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

final TSStatus status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());

TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
"insert into root.db.d1(time, s, s1) values (1, 1, 1)",
"insert into root.db2.d1(time, s) values (2, 2)",
"insert into root.db3.d1(time, s) values (3, 3)"),
null);
awaitUntilFlush(senderEnv);

final Set<String> expectedResSet = new HashSet<>();
expectedResSet.add("1,1.0,1.0,null,");
expectedResSet.add("2,null,null,2.0,");

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select * from root.db.**,root.db2.**",
"Time,root.db.d1.s,root.db.d1.s1,root.db2.d1.s,",
expectedResSet);
}
}
}
Loading
Loading