diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/TableModelUtils.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/TableModelUtils.java index 265548d80ccd..e03188d73762 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/TableModelUtils.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/TableModelUtils.java @@ -115,6 +115,39 @@ public static void insertData( TestUtils.executeNonQueries(dataBaseName, BaseEnv.TABLE_SQL_DIALECT, baseEnv, list, null); } + public static void insertData( + final String dataBaseName, + final String tableName, + final int deviceStartIndex, + final int deviceEndIndex, + final int startInclusive, + final int endExclusive, + final BaseEnv baseEnv) { + List list = new ArrayList<>(endExclusive - startInclusive + 1); + for (int deviceIndex = deviceStartIndex; deviceIndex < deviceEndIndex; ++deviceIndex) { + for (int i = startInclusive; i < endExclusive; ++i) { + list.add( + String.format( + "insert into %s (s0, s3, s2, s1, s4, s5, s6, s7, s8, s9, s10, s11, time) values ('t%s','t%s','t%s','t%s','%s', %s.0, %s, %s, %d, %d.0, '%s', '%s', %s)", + tableName, + deviceIndex, + deviceIndex, + deviceIndex, + deviceIndex, + i, + i, + i, + i, + i, + i, + getDateStr(i), + i, + i)); + } + } + TestUtils.executeNonQueries(dataBaseName, BaseEnv.TABLE_SQL_DIALECT, baseEnv, list, null); + } + public static void insertData( final String dataBaseName, final String tableName, diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileDecompositionWithModsIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileDecompositionWithModsIT.java new file mode 100644 index 000000000000..22e042e28538 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileDecompositionWithModsIT.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it.dual.tablemodel.manual.basic; + +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.isession.SessionConfig; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualBasic; +import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils; +import org.apache.iotdb.pipe.it.dual.tablemodel.manual.AbstractPipeTableModelDualManualIT; + +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.Collections; +import java.util.HashSet; + +import static org.apache.iotdb.db.it.utils.TestUtils.executeNonQueryWithRetry; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2DualTableManualBasic.class}) +public class IoTDBPipeTsFileDecompositionWithModsIT extends AbstractPipeTableModelDualManualIT { + + /** + * Test IoTDB pipe handling TsFile decomposition with Mods (modification operations) in table + * model + * + *

Test scenario: 1. Create two storage groups sg1 and sg2, each containing table1 2. Insert + * small amount of data in sg1 (1-6 rows), insert large amount of data in sg2 (110 batches, 100 + * rows per batch) 3. Execute FLUSH operation to persist data to TsFile 4. Execute multiple DELETE + * operations on sg1, deleting data in time ranges 2-4 and 3-5 5. Execute multiple DELETE + * operations on sg2, deleting data matching specific conditions (s0-s3 field values) 6. Execute + * FLUSH operation again 7. Create pipe with mods enabled, synchronize data to receiver 8. Verify + * correctness of receiver data: - sg1 only retains time=1 data, time=2-4 data is correctly + * deleted - sg2 DELETE operation results meet expectations (t10 retains 1000 rows, t11 all + * deleted, t12 retains 5900 rows, etc.) + * + *

Test purpose: Verify that IoTDB pipe can correctly handle Mods (modification operations) in + * TsFile, ensuring DELETE operations can be correctly synchronized to the receiver and data + * consistency is guaranteed. + */ + @Test + public void testTsFileDecompositionWithMods() { + TableModelUtils.createDataBaseAndTable(senderEnv, "table1", "sg1"); + TableModelUtils.createDataBaseAndTable(receiverEnv, "table1", "sg1"); + + TableModelUtils.insertData("sg1", "table1", 1, 6, senderEnv); + + TableModelUtils.createDataBaseAndTable(senderEnv, "table1", "sg2"); + for (int i = 1; i <= 110; i++) { + TableModelUtils.insertData("sg2", "table1", 10, 15, (i - 1) * 100, i * 100, senderEnv); + } + + executeNonQueryWithRetry(senderEnv, "FLUSH"); + + executeNonQueryWithRetry( + senderEnv, + "DELETE FROM table1 WHERE time >= 2 AND time <= 4", + SessionConfig.DEFAULT_USER, + SessionConfig.DEFAULT_PASSWORD, + "sg1", + "table"); + + executeNonQueryWithRetry( + senderEnv, + "DELETE FROM table1 WHERE time >= 3 AND time <= 5", + SessionConfig.DEFAULT_USER, + SessionConfig.DEFAULT_PASSWORD, + "sg1", + "table"); + + executeNonQueryWithRetry( + senderEnv, + "DELETE FROM table1 WHERE time >= 0 AND time < 10000 AND s0 ='t10' AND s1='t10' AND s2='t10' AND s3='t10'", + SessionConfig.DEFAULT_USER, + SessionConfig.DEFAULT_PASSWORD, + "sg2", + "table"); + + executeNonQueryWithRetry( + senderEnv, + "DELETE FROM table1 WHERE time >= 0 AND time <= 11000 AND s0 ='t11' AND s1='t11' AND s2='t11' AND s3='t11'", + SessionConfig.DEFAULT_USER, + SessionConfig.DEFAULT_PASSWORD, + "sg2", + "table"); + + executeNonQueryWithRetry( + senderEnv, + "DELETE FROM table1 WHERE time >= 5000 AND time < 10100 AND s0 ='t12' AND s1='t12' AND s2='t12' AND s3='t12'", + SessionConfig.DEFAULT_USER, + SessionConfig.DEFAULT_PASSWORD, + "sg2", + "table"); + + executeNonQueryWithRetry( + senderEnv, + "DELETE FROM table1 WHERE time >= 0 AND time < 10000 AND s0 ='t13' AND s1='t13' AND s2='t13' AND s3='t13'", + SessionConfig.DEFAULT_USER, + SessionConfig.DEFAULT_PASSWORD, + "sg2", + "table"); + + executeNonQueryWithRetry( + senderEnv, + "DELETE FROM table1 WHERE time >= 10000 AND time <= 11000 AND s0 ='t14' AND s1='t14' AND s2='t14' AND s3='t14'", + SessionConfig.DEFAULT_USER, + SessionConfig.DEFAULT_PASSWORD, + "sg2", + "table"); + + executeNonQueryWithRetry(senderEnv, "FLUSH"); + + executeNonQueryWithRetry( + senderEnv, + String.format( + "CREATE PIPE test_pipe WITH SOURCE ('mods.enable'='true', 'capture.table'='true') WITH CONNECTOR('ip'='%s', 'port'='%s', 'username'='root', 'format'='tablet')", + receiverEnv.getDataNodeWrapperList().get(0).getIp(), + receiverEnv.getDataNodeWrapperList().get(0).getPort())); + + HashSet expectedResults = new HashSet<>(); + expectedResults.add( + "t1,t1,t1,t1,1,1.0,1,1970-01-01T00:00:00.001Z,1,1.0,1970-01-01,1,1970-01-01T00:00:00.001Z,"); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + TableModelUtils.getQuerySql("table1"), + TableModelUtils.generateHeaderResults(), + expectedResults, + "sg1"); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT s4 FROM table1 WHERE time >= 2 AND time <= 4", + "s4,", + Collections.emptySet(), + "sg1"); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(*) as count FROM table1 WHERE s0 ='t10' AND s1='t10' AND s2='t10' AND s3='t10'", + "count,", + Collections.singleton("1000,"), + "sg2"); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(*) as count FROM table1 WHERE s0 ='t11' AND s1='t11' AND s2='t11' AND s3='t11'", + "count,", + Collections.singleton("0,"), + "sg2"); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(*) as count FROM table1 WHERE s0 ='t12' AND s1='t12' AND s2='t12' AND s3='t12'", + "count,", + Collections.singleton("5900,"), + "sg2"); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(*) as count FROM table1 WHERE s0 ='t13' AND s1='t13' AND s2='t13' AND s3='t13'", + "count,", + Collections.singleton("1000,"), + "sg2"); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(*) as count FROM table1 WHERE s0 ='t14' AND s1='t14' AND s2='t14' AND s3='t14'", + "count,", + Collections.singleton("10000,"), + "sg2"); + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java new file mode 100644 index 000000000000..b939ec79b8be --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java @@ -0,0 +1,660 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it.dual.treemodel.manual; + +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeManual; + +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.Collections; +import java.util.HashSet; + +import static org.apache.iotdb.db.it.utils.TestUtils.executeNonQueryWithRetry; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2DualTreeManual.class}) +public class IoTDBPipeTsFileDecompositionWithModsIT extends AbstractPipeDualTreeModelManualIT { + + @Override + protected void setupConfig() { + super.setupConfig(); + senderEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH"); + senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); + receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); + } + + /** + * Test IoTDB pipe handling TsFile decomposition with Mods (modification operations) in tree model + * + *

Test scenario: 1. Create database root.sg1 with 4 devices: d1 (aligned timeseries), d2 + * (non-aligned timeseries), d3 (aligned timeseries), d4 (aligned timeseries) 2. Insert initial + * data into d1, d2, d3 3. Execute FLUSH operation to persist data to TsFile 4. Execute DELETE + * operation on d1.s1, deleting data in time range 2-4 5. Insert large amount of data into d4 + * (11000 records, inserted in batches) 6. Execute multiple DELETE operations on d4: - Delete s1 + * field data where time<=10000 - Delete s2 field data where time>1000 - Delete s3 field data + * where time<=8000 7. Delete all data from d2 and d3 8. Execute FLUSH operation again 9. Create + * pipe with mods enabled, synchronize data to receiver 10. Verify correctness of receiver data: - + * d1 s1 field is null in time range 2-4, other data is normal - d2 and d3 data is completely + * deleted - d4 DELETE operation results meet expectations for each field + * + *

Test purpose: Verify that IoTDB pipe can correctly handle Mods (modification operations) in + * TsFile under tree model, ensuring various DELETE operations can be correctly synchronized to + * the receiver and data consistency is guaranteed. + */ + @Test + public void testTsFileDecompositionWithMods() throws Exception { + TestUtils.executeNonQueryWithRetry(senderEnv, "CREATE DATABASE root.sg1"); + TestUtils.executeNonQueryWithRetry(receiverEnv, "CREATE DATABASE root.sg1"); + + TestUtils.executeNonQueryWithRetry( + senderEnv, "CREATE ALIGNED TIMESERIES root.sg1.d1(s1 FLOAT, s2 FLOAT, s3 FLOAT)"); + TestUtils.executeNonQueryWithRetry( + senderEnv, "CREATE TIMESERIES root.sg1.d2.s1 WITH DATATYPE=FLOAT"); + TestUtils.executeNonQueryWithRetry( + senderEnv, "CREATE TIMESERIES root.sg1.d2.s2 WITH DATATYPE=FLOAT"); + TestUtils.executeNonQueryWithRetry( + senderEnv, "CREATE ALIGNED TIMESERIES root.sg1.d3(s1 FLOAT, s2 FLOAT, s3 FLOAT)"); + + TestUtils.executeNonQueryWithRetry( + senderEnv, + "INSERT INTO root.sg1.d1(time, s1, s2, s3) ALIGNED VALUES (1, 1.0, 2.0, 3.0), (2, 1.1, 2.1, 3.1), (3, 1.2, 2.2, 3.2), (4, 1.3, 2.3, 3.3), (5, 1.4, 2.4, 3.4)"); + + TestUtils.executeNonQueryWithRetry( + senderEnv, + "INSERT INTO root.sg1.d2(time, s1, s2) VALUES (1, 10.0, 20.0), (2, 10.1, 20.1), (3, 10.2, 20.2), (4, 10.3, 20.3), (5, 10.4, 20.4)"); + + TestUtils.executeNonQueryWithRetry( + senderEnv, + "INSERT INTO root.sg1.d3(time, s1, s2, s3) ALIGNED VALUES (1, 100.0, 200.0, 300.0), (2, 100.1, 200.1, 300.1), (3, 100.2, 200.2, 300.2)"); + + TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH"); + + TestUtils.executeNonQueryWithRetry( + senderEnv, "DELETE FROM root.sg1.d1.s1 WHERE time >= 2 AND time <= 4"); + + TestUtils.executeNonQueryWithRetry( + senderEnv, + "INSERT INTO root.sg1.d3(time, s1, s2, s3) ALIGNED VALUES (1, 100.0, 200.0, 300.0), (2, 100.1, 200.1, 300.1), (3, 100.2, 200.2, 300.2)"); + + TestUtils.executeNonQueryWithRetry( + senderEnv, "CREATE ALIGNED TIMESERIES root.sg1.d4(s1 FLOAT, s2 FLOAT, s3 FLOAT)"); + String s = "INSERT INTO root.sg1.d4(time, s1, s2, s3) ALIGNED VALUES "; + StringBuilder insertBuilder = new StringBuilder(s); + for (int i = 1; i <= 11000; i++) { + insertBuilder + .append("(") + .append(i) + .append(",") + .append(1.0f) + .append(",") + .append(2.0f) + .append(",") + .append(3.0f) + .append(")"); + if (i % 100 != 0) { + insertBuilder.append(","); + } else { + TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder.toString()); + insertBuilder = new StringBuilder(s); + } + } + + TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH"); + + TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d4.s1 WHERE time <= 10000"); + TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d4.s2 WHERE time > 1000"); + TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d4.s3 WHERE time <= 8000"); + + TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d2.*"); + + TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d3.*"); + + TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH"); + + // Verify sender data integrity before creating pipe to avoid leader election issues + // This ensures all data is properly persisted and consistent on sender side + // before starting the pipe synchronization process + HashSet results = new HashSet<>(); + results.add("1,3.0,1.0,2.0,"); + results.add("2,3.1,null,2.1,"); + results.add("3,3.2,null,2.2,"); + results.add("4,3.3,null,2.3,"); + results.add("5,3.4,1.4,2.4,"); + + TestUtils.assertDataEventuallyOnEnv( + senderEnv, + "SELECT * FROM root.sg1.d1 ORDER BY time", + "Time,root.sg1.d1.s3,root.sg1.d1.s1,root.sg1.d1.s2,", + results); + + executeNonQueryWithRetry( + senderEnv, + String.format( + "CREATE PIPE test_pipe WITH SOURCE ('mods.enable'='true') WITH CONNECTOR('ip'='%s', 'port'='%s', 'format'='tablet')", + receiverEnv.getDataNodeWrapperList().get(0).getIp(), + receiverEnv.getDataNodeWrapperList().get(0).getPort())); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT * FROM root.sg1.d1 ORDER BY time", + "Time,root.sg1.d1.s3,root.sg1.d1.s1,root.sg1.d1.s2,", + results); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, "SELECT * FROM root.sg1.d2 ORDER BY time", "Time,", Collections.emptySet()); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, "SELECT * FROM root.sg1.d3 ORDER BY time", "Time,", Collections.emptySet()); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT s1 FROM root.sg1.d1 WHERE time >= 2 AND time <= 4", + "Time,root.sg1.d1.s1,", + Collections.emptySet()); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(**) FROM root.sg1.d4", + "COUNT(root.sg1.d4.s3),COUNT(root.sg1.d4.s1),COUNT(root.sg1.d4.s2),", + Collections.singleton("3000,1000,1000,")); + } + + /** + * Test IoTDB pipe handling TsFile decomposition with Mods (modification operations) in tree model + * - Multi-pipe scenario + * + *

Test scenario: 1. Create database root.sg1 with 4 devices: d1 (aligned timeseries), d2 + * (non-aligned timeseries), d3 (aligned timeseries), d4 (aligned timeseries) 2. Insert initial + * data into d1, d2, d3 3. Execute FLUSH operation to persist data to TsFile 4. Execute DELETE + * operation on d1.s1, deleting data in time range 2-4 5. Insert large amount of data into d4 + * (11000 records, inserted in batches) 6. Execute multiple DELETE operations on d4: - Delete s1 + * field data where time<=10000 - Delete s2 field data where time>1000 - Delete s3 field data + * where time<=8000 7. Delete all data from d2 and d3 8. Execute FLUSH operation again 9. Create 4 + * independent pipes, each targeting different device paths: - test_pipe1: handles data for + * root.sg1.d1.** path - test_pipe2: handles data for root.sg1.d2.** path - test_pipe3: handles + * data for root.sg1.d3.** path - test_pipe4: handles data for root.sg1.d4.** path 10. Verify + * correctness of receiver data: - d1 s1 field is null in time range 2-4, other data is normal - + * d2 and d3 data is completely deleted - d4 DELETE operation results meet expectations for each + * field + * + *

Test purpose: Verify that IoTDB pipe can correctly handle Mods (modification operations) in + * TsFile under tree model through multiple independent pipes, ensuring DELETE operations for + * different paths can be correctly synchronized to the receiver and data consistency is + * guaranteed. The main difference from the first test method is using multiple pipes to handle + * data for different devices separately. + */ + @Test + public void testTsFileDecompositionWithMods2() throws Exception { + TestUtils.executeNonQueryWithRetry(senderEnv, "CREATE DATABASE root.sg1"); + TestUtils.executeNonQueryWithRetry(receiverEnv, "CREATE DATABASE root.sg1"); + + TestUtils.executeNonQueryWithRetry( + senderEnv, "CREATE ALIGNED TIMESERIES root.sg1.d1(s1 FLOAT, s2 FLOAT, s3 FLOAT)"); + TestUtils.executeNonQueryWithRetry( + senderEnv, "CREATE TIMESERIES root.sg1.d2.s1 WITH DATATYPE=FLOAT"); + TestUtils.executeNonQueryWithRetry( + senderEnv, "CREATE TIMESERIES root.sg1.d2.s2 WITH DATATYPE=FLOAT"); + TestUtils.executeNonQueryWithRetry( + senderEnv, "CREATE ALIGNED TIMESERIES root.sg1.d3(s1 FLOAT, s2 FLOAT, s3 FLOAT)"); + + TestUtils.executeNonQueryWithRetry( + senderEnv, + "INSERT INTO root.sg1.d1(time, s1, s2, s3) ALIGNED VALUES (1, 1.0, 2.0, 3.0), (2, 1.1, 2.1, 3.1), (3, 1.2, 2.2, 3.2), (4, 1.3, 2.3, 3.3), (5, 1.4, 2.4, 3.4)"); + + TestUtils.executeNonQueryWithRetry( + senderEnv, + "INSERT INTO root.sg1.d2(time, s1, s2) VALUES (1, 10.0, 20.0), (2, 10.1, 20.1), (3, 10.2, 20.2), (4, 10.3, 20.3), (5, 10.4, 20.4)"); + + TestUtils.executeNonQueryWithRetry( + senderEnv, + "INSERT INTO root.sg1.d3(time, s1, s2, s3) ALIGNED VALUES (1, 100.0, 200.0, 300.0), (2, 100.1, 200.1, 300.1), (3, 100.2, 200.2, 300.2)"); + + TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH"); + + TestUtils.executeNonQueryWithRetry( + senderEnv, "DELETE FROM root.sg1.d1.s1 WHERE time >= 2 AND time <= 4"); + + TestUtils.executeNonQueryWithRetry( + senderEnv, + "INSERT INTO root.sg1.d3(time, s1, s2, s3) ALIGNED VALUES (1, 100.0, 200.0, 300.0), (2, 100.1, 200.1, 300.1), (3, 100.2, 200.2, 300.2)"); + + TestUtils.executeNonQueryWithRetry( + senderEnv, "CREATE ALIGNED TIMESERIES root.sg1.d4(s1 FLOAT, s2 FLOAT, s3 FLOAT)"); + String s = "INSERT INTO root.sg1.d4(time, s1, s2, s3) ALIGNED VALUES "; + StringBuilder insertBuilder = new StringBuilder(s); + for (int i = 1; i <= 11000; i++) { + insertBuilder + .append("(") + .append(i) + .append(",") + .append(1.0f) + .append(",") + .append(2.0f) + .append(",") + .append(3.0f) + .append(")"); + if (i % 100 != 0) { + insertBuilder.append(","); + } else { + TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder.toString()); + insertBuilder = new StringBuilder(s); + } + } + + TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH"); + + TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d4.s1 WHERE time <= 10000"); + TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d4.s2 WHERE time > 1000"); + TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d4.s3 WHERE time <= 8000"); + + TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d2.*"); + + TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d3.*"); + + TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH"); + + // Verify sender data integrity before creating pipes to avoid leader election issues + // This ensures all data is properly persisted and consistent on sender side + // before starting the pipe synchronization process + HashSet results = new HashSet<>(); + results.add("1,3.0,1.0,2.0,"); + results.add("2,3.1,null,2.1,"); + results.add("3,3.2,null,2.2,"); + results.add("4,3.3,null,2.3,"); + results.add("5,3.4,1.4,2.4,"); + + TestUtils.assertDataEventuallyOnEnv( + senderEnv, + "SELECT * FROM root.sg1.d1 ORDER BY time", + "Time,root.sg1.d1.s3,root.sg1.d1.s1,root.sg1.d1.s2,", + results); + + executeNonQueryWithRetry( + senderEnv, + String.format( + "CREATE PIPE test_pipe1 WITH SOURCE ('mods.enable'='true','path'='root.sg1.d1.**') WITH CONNECTOR('ip'='%s', 'port'='%s', 'format'='tablet')", + receiverEnv.getDataNodeWrapperList().get(0).getIp(), + receiverEnv.getDataNodeWrapperList().get(0).getPort())); + + executeNonQueryWithRetry( + senderEnv, + String.format( + "CREATE PIPE test_pipe2 WITH SOURCE ('mods.enable'='true','path'='root.sg1.d2.**') WITH CONNECTOR('ip'='%s', 'port'='%s', 'format'='tablet')", + receiverEnv.getDataNodeWrapperList().get(0).getIp(), + receiverEnv.getDataNodeWrapperList().get(0).getPort())); + + executeNonQueryWithRetry( + senderEnv, + String.format( + "CREATE PIPE test_pipe3 WITH SOURCE ('mods.enable'='true','path'='root.sg1.d3.**') WITH CONNECTOR('ip'='%s', 'port'='%s', 'format'='tablet')", + receiverEnv.getDataNodeWrapperList().get(0).getIp(), + receiverEnv.getDataNodeWrapperList().get(0).getPort())); + + executeNonQueryWithRetry( + senderEnv, + String.format( + "CREATE PIPE test_pipe4 WITH SOURCE ('mods.enable'='true','path'='root.sg1.d4.**') WITH CONNECTOR('ip'='%s', 'port'='%s', 'format'='tablet')", + receiverEnv.getDataNodeWrapperList().get(0).getIp(), + receiverEnv.getDataNodeWrapperList().get(0).getPort())); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT * FROM root.sg1.d1 ORDER BY time", + "Time,root.sg1.d1.s3,root.sg1.d1.s1,root.sg1.d1.s2,", + results); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, "SELECT * FROM root.sg1.d2 ORDER BY time", "Time,", Collections.emptySet()); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, "SELECT * FROM root.sg1.d3 ORDER BY time", "Time,", Collections.emptySet()); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT s1 FROM root.sg1.d1 WHERE time >= 2 AND time <= 4", + "Time,root.sg1.d1.s1,", + Collections.emptySet()); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(**) FROM root.sg1.d4", + "COUNT(root.sg1.d4.s3),COUNT(root.sg1.d4.s1),COUNT(root.sg1.d4.s2),", + Collections.singleton("3000,1000,1000,")); + } + + /** + * Test IoTDB pipe handling TsFile decomposition with Mods (modification operations) in tree model + * - Large scale single point deletion scenario + * + *

Test scenario: 1. Create database root.sg1 with 1 device: d1 (aligned timeseries with 5 + * sensors) 2. Insert 20000 data points for each sensor with different time ranges: - s1: time + * 1-20000 - s2: time 10001-30000 - s3: time 20001-40000 - s4: time 30001-50000 - s5: time + * 40001-60000 3. Execute FLUSH operation to persist data to TsFile 4. Execute 2000 single point + * DELETE operations, each deleting one time point from different sensors 5. Execute FLUSH + * operation again 6. Create pipe with mods enabled 7. Verify correctness of receiver data: - Each + * sensor should have 19800 remaining data points - Deleted points should not appear in receiver + * + *

Test purpose: Verify that IoTDB pipe can correctly handle large scale single point deletion + * operations in TsFile under tree model, ensuring the binary search optimization in + * ModsOperationUtil works correctly with many modification entries. + */ + @Test + public void testTsFileDecompositionWithModsLargeScaleSinglePointDeletion() throws Exception { + TestUtils.executeNonQueryWithRetry(senderEnv, "CREATE DATABASE root.sg1"); + TestUtils.executeNonQueryWithRetry(receiverEnv, "CREATE DATABASE root.sg1"); + + // Insert 20000 data points for s1 (time 1-20000) + String s1 = "INSERT INTO root.sg1.d1(time, s1) ALIGNED VALUES "; + StringBuilder insertBuilder1 = new StringBuilder(s1); + for (int i = 1; i <= 20000; i++) { + insertBuilder1.append("(").append(i).append(",").append(1.0f).append(")"); + if (i % 1000 != 0) { + insertBuilder1.append(","); + } else { + TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder1.toString()); + insertBuilder1 = new StringBuilder(s1); + } + } + // Execute remaining data if any + if (insertBuilder1.length() > s1.length()) { + TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder1.toString()); + } + + // Insert 20000 data points for s2 (time 10001-30000) + String s2 = "INSERT INTO root.sg1.d1(time, s2) ALIGNED VALUES "; + StringBuilder insertBuilder2 = new StringBuilder(s2); + for (int i = 10001; i <= 30000; i++) { + insertBuilder2.append("(").append(i).append(",").append(2.0f).append(")"); + if (i % 1000 != 0) { + insertBuilder2.append(","); + } else { + TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder2.toString()); + insertBuilder2 = new StringBuilder(s2); + } + } + // Execute remaining data if any + if (insertBuilder2.length() > s2.length()) { + TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder2.toString()); + } + + // Insert 20000 data points for s3 (time 20001-40000) + String s3 = "INSERT INTO root.sg1.d1(time, s3) ALIGNED VALUES "; + StringBuilder insertBuilder3 = new StringBuilder(s3); + for (int i = 20001; i <= 40000; i++) { + insertBuilder3.append("(").append(i).append(",").append(3.0f).append(")"); + if (i % 1000 != 0) { + insertBuilder3.append(","); + } else { + TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder3.toString()); + insertBuilder3 = new StringBuilder(s3); + } + } + // Execute remaining data if any + if (insertBuilder3.length() > s3.length()) { + TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder3.toString()); + } + + // Insert 20000 data points for s4 (time 30001-50000) + String s4 = "INSERT INTO root.sg1.d1(time, s4) ALIGNED VALUES "; + StringBuilder insertBuilder4 = new StringBuilder(s4); + for (int i = 30001; i <= 50000; i++) { + insertBuilder4.append("(").append(i).append(",").append(4.0f).append(")"); + if (i % 1000 != 0) { + insertBuilder4.append(","); + } else { + TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder4.toString()); + insertBuilder4 = new StringBuilder(s4); + } + } + // Execute remaining data if any + if (insertBuilder4.length() > s4.length()) { + TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder4.toString()); + } + + // Insert 20000 data points for s5 (time 40001-60000) + String s5 = "INSERT INTO root.sg1.d1(time, s5) ALIGNED VALUES "; + StringBuilder insertBuilder5 = new StringBuilder(s5); + for (int i = 40001; i <= 60000; i++) { + insertBuilder5.append("(").append(i).append(",").append(5.0f).append(")"); + if (i % 1000 != 0) { + insertBuilder5.append(","); + } else { + TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder5.toString()); + insertBuilder5 = new StringBuilder(s5); + } + } + // Execute remaining data if any + if (insertBuilder5.length() > s5.length()) { + TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder5.toString()); + } + + TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH"); + + // Execute 2000 single point DELETE operations + // Delete 400 points from each sensor (distributed across different time ranges) + for (int i = 0; i < 400; i++) { + // Delete from s1: every 10th point starting from 10 + TestUtils.executeNonQueryWithRetry( + senderEnv, "DELETE FROM root.sg1.d1.s1 WHERE time = " + (10 + i * 10)); + + // Delete from s2: every 10th point starting from 10010 + TestUtils.executeNonQueryWithRetry( + senderEnv, "DELETE FROM root.sg1.d1.s2 WHERE time = " + (10010 + i * 10)); + + // Delete from s3: every 10th point starting from 20010 + TestUtils.executeNonQueryWithRetry( + senderEnv, "DELETE FROM root.sg1.d1.s3 WHERE time = " + (20010 + i * 10)); + + // Delete from s4: every 10th point starting from 30010 + TestUtils.executeNonQueryWithRetry( + senderEnv, "DELETE FROM root.sg1.d1.s4 WHERE time = " + (30010 + i * 10)); + + // Delete from s5: every 10th point starting from 40010 + TestUtils.executeNonQueryWithRetry( + senderEnv, "DELETE FROM root.sg1.d1.s5 WHERE time = " + (40010 + i * 10)); + } + + TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH"); + + // Verify sender data integrity before creating pipe to avoid leader election issues + // This ensures all data is properly persisted and consistent on sender side + // before starting the pipe synchronization process + TestUtils.assertDataEventuallyOnEnv( + senderEnv, + "SELECT COUNT(**) FROM root.sg1.d1", + "COUNT(root.sg1.d1.s3),COUNT(root.sg1.d1.s4),COUNT(root.sg1.d1.s5),COUNT(root.sg1.d1.s1),COUNT(root.sg1.d1.s2),", + Collections.singleton("19600,19600,19600,19600,19600,")); + + executeNonQueryWithRetry( + senderEnv, + String.format( + "CREATE PIPE test_pipe WITH SOURCE ('mods.enable'='true') WITH CONNECTOR('ip'='%s', 'port'='%s', 'format'='tablet')", + receiverEnv.getDataNodeWrapperList().get(0).getIp(), + receiverEnv.getDataNodeWrapperList().get(0).getPort())); + + // Verify total count of all sensors using COUNT(*) + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(**) FROM root.sg1.d1", + "COUNT(root.sg1.d1.s3),COUNT(root.sg1.d1.s4),COUNT(root.sg1.d1.s5),COUNT(root.sg1.d1.s1),COUNT(root.sg1.d1.s2),", + Collections.singleton("19600,19600,19600,19600,19600,")); + + // Verify individual sensor counts + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s1) FROM root.sg1.d1", + "COUNT(root.sg1.d1.s1),", + Collections.singleton("19600,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s2) FROM root.sg1.d1", + "COUNT(root.sg1.d1.s2),", + Collections.singleton("19600,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s3) FROM root.sg1.d1", + "COUNT(root.sg1.d1.s3),", + Collections.singleton("19600,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s4) FROM root.sg1.d1", + "COUNT(root.sg1.d1.s4),", + Collections.singleton("19600,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s5) FROM root.sg1.d1", + "COUNT(root.sg1.d1.s5),", + Collections.singleton("19600,")); + + // Verify count of deleted time ranges using COUNT with WHERE clause + // These should return 0 since all points in these ranges were deleted + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time >= 10 AND time <= 4000 AND time % 10 = 0", + "COUNT(root.sg1.d1.s1),", + Collections.singleton("0,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time >= 10010 AND time <= 14000 AND time % 10 = 0", + "COUNT(root.sg1.d1.s2),", + Collections.singleton("0,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time >= 20010 AND time <= 24000 AND time % 10 = 0", + "COUNT(root.sg1.d1.s3),", + Collections.singleton("0,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time >= 30010 AND time <= 34000 AND time % 10 = 0", + "COUNT(root.sg1.d1.s4),", + Collections.singleton("0,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time >= 40010 AND time <= 44000 AND time % 10 = 0", + "COUNT(root.sg1.d1.s5),", + Collections.singleton("0,")); + + // Verify count of non-deleted time ranges using multiple range queries + // Check ranges before deletion area + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time >= 1 AND time < 10", + "COUNT(root.sg1.d1.s1),", + Collections.singleton("9,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time >= 10001 AND time < 10010", + "COUNT(root.sg1.d1.s2),", + Collections.singleton("9,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time >= 20001 AND time < 20010", + "COUNT(root.sg1.d1.s3),", + Collections.singleton("9,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time >= 30001 AND time < 30010", + "COUNT(root.sg1.d1.s4),", + Collections.singleton("9,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time >= 40001 AND time < 40010", + "COUNT(root.sg1.d1.s5),", + Collections.singleton("9,")); + + // Check ranges after deletion area + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time > 4000 AND time <= 20000", + "COUNT(root.sg1.d1.s1),", + Collections.singleton("16000,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time > 14000 AND time <= 30000", + "COUNT(root.sg1.d1.s2),", + Collections.singleton("16000,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time > 24000 AND time <= 40000", + "COUNT(root.sg1.d1.s3),", + Collections.singleton("16000,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time > 34000 AND time <= 50000", + "COUNT(root.sg1.d1.s4),", + Collections.singleton("16000,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time > 44000 AND time <= 60000", + "COUNT(root.sg1.d1.s5),", + Collections.singleton("16000,")); + + // Check non-deleted points within deletion range (every 10th point except the ones we deleted) + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time >= 10 AND time <= 4000 AND time % 10 != 0", + "COUNT(root.sg1.d1.s1),", + Collections.singleton("3591,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time >= 10010 AND time <= 14000 AND time % 10 != 0", + "COUNT(root.sg1.d1.s2),", + Collections.singleton("3591,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time >= 20010 AND time <= 24000 AND time % 10 != 0", + "COUNT(root.sg1.d1.s3),", + Collections.singleton("3591,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time >= 30010 AND time <= 34000 AND time % 10 != 0", + "COUNT(root.sg1.d1.s4),", + Collections.singleton("3591,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time >= 40010 AND time <= 44000 AND time % 10 != 0", + "COUNT(root.sg1.d1.s5),", + Collections.singleton("3591,")); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 3617b7347ad9..2a1ab3f5a35f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -700,7 +700,7 @@ private TsFileInsertionEventParser initEventParser() { // To avoid renaming of the tsFile database shouldParse4Privilege ? userName : null, this) - .provide()); + .provide(isWithMod)); return eventParser.get(); } catch (final IOException e) { close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java index 358103175fe7..b68d9492426c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.event.common.tsfile.parser; +import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; @@ -27,6 +28,8 @@ import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; +import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.tsfile.read.TsFileSequenceReader; @@ -53,6 +56,10 @@ public abstract class TsFileInsertionEventParser implements AutoCloseable { protected final PipeTaskMeta pipeTaskMeta; // used to report progress protected final PipeInsertionEvent sourceEvent; // used to report progress + // mods entry + protected PipeMemoryBlock allocatedMemoryBlockForModifications; + protected PatternTreeMap currentModifications; + protected final long initialTimeNano = System.nanoTime(); protected boolean timeUsageReported = false; @@ -124,5 +131,14 @@ public void close() { if (allocatedMemoryBlockForTablet != null) { allocatedMemoryBlockForTablet.close(); } + + if (currentModifications != null) { + // help GC + currentModifications = null; + } + + if (allocatedMemoryBlockForModifications != null) { + allocatedMemoryBlockForModifications.close(); + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java index a965c817b994..640d8cafd310 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java @@ -78,7 +78,7 @@ public TsFileInsertionEventParserProvider( this.sourceEvent = sourceEvent; } - public TsFileInsertionEventParser provide() throws IOException { + public TsFileInsertionEventParser provide(final boolean isWithMod) throws IOException { if (pipeName != null) { PipeTsFileToTabletsMetrics.getInstance() .markTsFileToTabletInvocation(pipeName + "_" + creationTime); @@ -94,7 +94,8 @@ public TsFileInsertionEventParser provide() throws IOException { endTime, pipeTaskMeta, userName, - sourceEvent); + sourceEvent, + isWithMod); } // Use scan container to save memory @@ -109,7 +110,8 @@ public TsFileInsertionEventParser provide() throws IOException { startTime, endTime, pipeTaskMeta, - sourceEvent); + sourceEvent, + isWithMod); } if (treePattern instanceof IoTDBTreePattern @@ -128,7 +130,8 @@ public TsFileInsertionEventParser provide() throws IOException { startTime, endTime, pipeTaskMeta, - sourceEvent); + sourceEvent, + isWithMod); } final Map deviceIsAlignedMap = @@ -144,7 +147,8 @@ public TsFileInsertionEventParser provide() throws IOException { startTime, endTime, pipeTaskMeta, - sourceEvent); + sourceEvent, + isWithMod); } final int originalSize = deviceIsAlignedMap.size(); @@ -161,7 +165,8 @@ public TsFileInsertionEventParser provide() throws IOException { startTime, endTime, pipeTaskMeta, - sourceEvent) + sourceEvent, + isWithMod) : new TsFileInsertionEventQueryParser( pipeName, creationTime, @@ -171,7 +176,8 @@ public TsFileInsertionEventParser provide() throws IOException { endTime, pipeTaskMeta, sourceEvent, - filteredDeviceIsAlignedMap); + filteredDeviceIsAlignedMap, + isWithMod); } private Map filterDeviceIsAlignedMapByPattern( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java index d61f7a791ca5..dbed85d5a6b9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java @@ -26,15 +26,18 @@ import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser; +import org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager; +import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.TimeseriesMetadata; import org.apache.tsfile.read.TsFileDeviceIterator; import org.apache.tsfile.read.TsFileReader; import org.apache.tsfile.read.TsFileSequenceReader; @@ -75,7 +78,7 @@ public TsFileInsertionEventQueryParser( final long endTime, final PipeInsertionEvent sourceEvent) throws IOException { - this(null, 0, tsFile, pattern, startTime, endTime, null, sourceEvent); + this(null, 0, tsFile, pattern, startTime, endTime, null, sourceEvent, false); } public TsFileInsertionEventQueryParser( @@ -86,7 +89,8 @@ public TsFileInsertionEventQueryParser( final long startTime, final long endTime, final PipeTaskMeta pipeTaskMeta, - final PipeInsertionEvent sourceEvent) + final PipeInsertionEvent sourceEvent, + final boolean isWithMod) throws IOException { this( pipeName, @@ -97,7 +101,8 @@ public TsFileInsertionEventQueryParser( endTime, pipeTaskMeta, sourceEvent, - null); + null, + isWithMod); } public TsFileInsertionEventQueryParser( @@ -109,11 +114,20 @@ public TsFileInsertionEventQueryParser( final long endTime, final PipeTaskMeta pipeTaskMeta, final PipeInsertionEvent sourceEvent, - final Map deviceIsAlignedMap) + final Map deviceIsAlignedMap, + final boolean isWithMod) throws IOException { super(pipeName, creationTime, pattern, null, startTime, endTime, pipeTaskMeta, sourceEvent); try { + currentModifications = + isWithMod + ? ModsOperationUtil.loadModificationsFromTsFile(tsFile) + : PatternTreeMapFactory.getModsPatternTreeMap(); + allocatedMemoryBlockForModifications = + PipeDataNodeResourceManager.memory() + .forceAllocateForTabletWithRetry(currentModifications.ramBytesUsed()); + final PipeTsFileResourceManager tsFileResourceManager = PipeDataNodeResourceManager.tsfile(); final Map> deviceMeasurementsMap; @@ -158,6 +172,60 @@ public TsFileInsertionEventQueryParser( allocatedMemoryBlock = PipeDataNodeResourceManager.memory().forceAllocate(memoryRequiredInBytes); + final Iterator>> iterator = + deviceMeasurementsMap.entrySet().iterator(); + while (isWithMod && iterator.hasNext()) { + final Map.Entry> entry = iterator.next(); + final IDeviceID deviceId = entry.getKey(); + final List measurements = entry.getValue(); + + // Check if deviceId is deleted + if (deviceId == null) { + LOGGER.warn("Found null deviceId, removing entry"); + iterator.remove(); + continue; + } + + // Check if measurements list is deleted or empty + if (measurements == null || measurements.isEmpty()) { + iterator.remove(); + continue; + } + + if (!currentModifications.isEmpty()) { + // Safely filter measurements, remove non-existent measurements + measurements.removeIf( + measurement -> { + if (measurement == null) { + return true; + } + + try { + TimeseriesMetadata meta = + tsFileSequenceReader.readTimeseriesMetadata(deviceId, measurement, true); + return ModsOperationUtil.isAllDeletedByMods( + deviceId, + measurement, + meta.getStatistics().getStartTime(), + meta.getStatistics().getEndTime(), + currentModifications); + } catch (IOException e) { + LOGGER.warn( + "Failed to read metadata for deviceId: {}, measurement: {}, removing", + deviceId, + measurement, + e); + return true; + } + }); + } + + // If measurements list is empty after filtering, remove the entire entry + if (measurements.isEmpty()) { + iterator.remove(); + } + } + // Filter again to get the final deviceMeasurementsMap that exactly matches the pattern. deviceMeasurementsMapIterator = filterDeviceMeasurementsMapByPattern(deviceMeasurementsMap).entrySet().iterator(); @@ -303,7 +371,8 @@ public boolean hasNext() { entry.getKey(), entry.getValue(), timeFilterExpression, - allocatedMemoryBlockForTablet); + allocatedMemoryBlockForTablet, + currentModifications); } catch (final Exception e) { close(); throw new PipeException( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java index 7c32321186e3..776b5e1e6fac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java @@ -19,9 +19,13 @@ package org.apache.iotdb.db.pipe.event.common.tsfile.parser.query; +import org.apache.iotdb.commons.path.PatternTreeMap; +import org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; +import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.tsfile.common.constant.TsFileConstant; @@ -62,6 +66,9 @@ public class TsFileInsertionEventQueryParserTabletIterator implements Iterator measurementModsList; + private RowRecord rowRecord; TsFileInsertionEventQueryParserTabletIterator( @@ -70,7 +77,8 @@ public class TsFileInsertionEventQueryParserTabletIterator implements Iterator measurements, final IExpression timeFilterExpression, - final PipeMemoryBlock allocatedBlockForTablet) + final PipeMemoryBlock allocatedBlockForTablet, + final PatternTreeMap currentModifications) throws IOException { this.tsFileReader = tsFileReader; this.measurementDataTypeMap = measurementDataTypeMap; @@ -90,6 +98,10 @@ public class TsFileInsertionEventQueryParserTabletIterator implements Iterator fields = rowRecord.getFields(); final int fieldSize = fields.size(); for (int i = 0; i < fieldSize; i++) { final Field field = fields.get(i); - tablet.addValue( - measurements.get(i), - rowIndex, - field == null ? null : field.getObjectValue(schemas.get(i).getType())); + final String measurement = measurements.get(i); + // Check if this value is deleted by mods + if (field == null + || ModsOperationUtil.isDelete(rowRecord.getTimestamp(), measurementModsList.get(i))) { + tablet.getBitMaps()[i].mark(rowIndex); + } else { + tablet.addValue(measurement, rowIndex, field.getObjectValue(schemas.get(i).getType())); + isNeedFillTime = true; + } + } + if (isNeedFillTime) { + tablet.addTimestamp(rowIndex, rowRecord.getTimestamp()); } if (tablet.getRowSize() == tablet.getMaxRowNumber()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index 47aba940a725..0ccac56be11c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -25,9 +25,11 @@ import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser; +import org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; +import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeException; @@ -36,7 +38,10 @@ import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.MetaMarker; import org.apache.tsfile.file.header.ChunkHeader; +import org.apache.tsfile.file.metadata.AlignedChunkMetadata; +import org.apache.tsfile.file.metadata.IChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.statistics.Statistics; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.read.common.BatchData; import org.apache.tsfile.read.common.Chunk; @@ -44,6 +49,7 @@ import org.apache.tsfile.read.reader.IChunkReader; import org.apache.tsfile.read.reader.chunk.AlignedChunkReader; import org.apache.tsfile.read.reader.chunk.ChunkReader; +import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.DateUtils; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.TsPrimitiveType; @@ -55,6 +61,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -77,7 +84,7 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { private IDeviceID currentDevice; private boolean currentIsAligned; private final List currentMeasurements = new ArrayList<>(); - + private final List modsInfos = new ArrayList<>(); // Cached time chunk private final List timeChunkList = new ArrayList<>(); private final List isMultiPageList = new ArrayList<>(); @@ -96,7 +103,8 @@ public TsFileInsertionEventScanParser( final long startTime, final long endTime, final PipeTaskMeta pipeTaskMeta, - final PipeInsertionEvent sourceEvent) + final PipeInsertionEvent sourceEvent, + final boolean isWithMod) throws IOException { super(pipeName, creationTime, pattern, null, startTime, endTime, pipeTaskMeta, sourceEvent); @@ -114,7 +122,19 @@ public TsFileInsertionEventScanParser( PipeConfig.getInstance().getPipeMaxAlignedSeriesChunkSizeInOneBatch()); try { - tsFileSequenceReader = new TsFileSequenceReader(tsFile.getAbsolutePath(), false, false); + currentModifications = + isWithMod + ? ModsOperationUtil.loadModificationsFromTsFile(tsFile) + : PatternTreeMapFactory.getModsPatternTreeMap(); + allocatedMemoryBlockForModifications = + PipeDataNodeResourceManager.memory() + .forceAllocateForTabletWithRetry(currentModifications.ramBytesUsed()); + + tsFileSequenceReader = + new TsFileSequenceReader( + tsFile.getAbsolutePath(), + !currentModifications.isEmpty(), + !currentModifications.isEmpty()); tsFileSequenceReader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1); prepareData(); @@ -130,9 +150,10 @@ public TsFileInsertionEventScanParser( final long startTime, final long endTime, final PipeTaskMeta pipeTaskMeta, - final PipeInsertionEvent sourceEvent) + final PipeInsertionEvent sourceEvent, + final boolean isWithMod) throws IOException { - this(null, 0, tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent); + this(null, 0, tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent, isWithMod); } @Override @@ -265,8 +286,9 @@ private Tablet getNextTablet() { final int rowIndex = tablet.getRowSize(); - tablet.addTimestamp(rowIndex, data.currentTime()); - putValueToColumns(data, tablet, rowIndex); + if (putValueToColumns(data, tablet, rowIndex)) { + tablet.addTimestamp(rowIndex, data.currentTime()); + } } data.next(); @@ -316,13 +338,24 @@ private void prepareData() throws IOException { } while (!data.hasCurrent()); } - private void putValueToColumns(final BatchData data, final Tablet tablet, final int rowIndex) { + private boolean putValueToColumns(final BatchData data, final Tablet tablet, final int rowIndex) { + boolean isNeedFillTime = false; if (data.getDataType() == TSDataType.VECTOR) { for (int i = 0; i < tablet.getSchemas().size(); ++i) { final TsPrimitiveType primitiveType = data.getVector()[i]; - if (Objects.isNull(primitiveType)) { + if (Objects.isNull(primitiveType) + || ModsOperationUtil.isDelete(data.currentTime(), modsInfos.get(i))) { + switch (tablet.getSchemas().get(i).getType()) { + case TEXT: + case BLOB: + case STRING: + tablet.addValue(rowIndex, i, Binary.EMPTY_VALUE.getValues()); + } + tablet.getBitMaps()[i].mark(rowIndex); continue; } + + isNeedFillTime = true; switch (tablet.getSchemas().get(i).getType()) { case BOOLEAN: tablet.addValue(rowIndex, i, primitiveType.getBoolean()); @@ -353,6 +386,7 @@ private void putValueToColumns(final BatchData data, final Tablet tablet, final } } } else { + isNeedFillTime = true; switch (tablet.getSchemas().get(0).getType()) { case BOOLEAN: tablet.addValue(rowIndex, 0, data.getBoolean()); @@ -382,6 +416,7 @@ private void putValueToColumns(final BatchData data, final Tablet tablet, final throw new UnSupportedDataTypeException("UnSupported" + data.getDataType()); } } + return isNeedFillTime; } private void moveToNextChunkReader() throws IOException, IllegalStateException { @@ -389,6 +424,7 @@ private void moveToNextChunkReader() throws IOException, IllegalStateException { long valueChunkSize = 0; final List valueChunkList = new ArrayList<>(); currentMeasurements.clear(); + modsInfos.clear(); if (lastMarker == MetaMarker.SEPARATOR) { chunkReader = null; @@ -404,131 +440,191 @@ private void moveToNextChunkReader() throws IOException, IllegalStateException { case MetaMarker.TIME_CHUNK_HEADER: case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER: case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER: - // Notice that the data in one chunk group is either aligned or non-aligned - // There is no need to consider non-aligned chunks when there are value chunks - currentIsMultiPage = marker == MetaMarker.CHUNK_HEADER; + { + // Notice that the data in one chunk group is either aligned or non-aligned + // There is no need to consider non-aligned chunks when there are value chunks + currentIsMultiPage = marker == MetaMarker.CHUNK_HEADER; + long currentChunkHeaderOffset = tsFileSequenceReader.position() - 1; + chunkHeader = tsFileSequenceReader.readChunkHeader(marker); - chunkHeader = tsFileSequenceReader.readChunkHeader(marker); + final long nextMarkerOffset = + tsFileSequenceReader.position() + chunkHeader.getDataSize(); - if (Objects.isNull(currentDevice)) { - tsFileSequenceReader.position( - tsFileSequenceReader.position() + chunkHeader.getDataSize()); - break; - } + if (Objects.isNull(currentDevice)) { + tsFileSequenceReader.position(nextMarkerOffset); + break; + } - if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK) - == TsFileConstant.TIME_COLUMN_MASK) { - timeChunkList.add( - new Chunk( - chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize()))); - isMultiPageList.add(marker == MetaMarker.TIME_CHUNK_HEADER); - break; - } + if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK) + == TsFileConstant.TIME_COLUMN_MASK) { + timeChunkList.add( + new Chunk( + chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize()))); + isMultiPageList.add(marker == MetaMarker.TIME_CHUNK_HEADER); + break; + } - if (!treePattern.matchesMeasurement(currentDevice, chunkHeader.getMeasurementID())) { - tsFileSequenceReader.position( - tsFileSequenceReader.position() + chunkHeader.getDataSize()); - break; - } + if (!treePattern.matchesMeasurement(currentDevice, chunkHeader.getMeasurementID())) { + tsFileSequenceReader.position(nextMarkerOffset); + break; + } - if (chunkHeader.getDataSize() > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { - PipeDataNodeResourceManager.memory() - .forceResize(allocatedMemoryBlockForChunk, chunkHeader.getDataSize()); - } + // Skip the chunk if it is fully deleted by mods + if (!currentModifications.isEmpty()) { + final Statistics statistics = + findNonAlignedChunkStatistics( + tsFileSequenceReader.getIChunkMetadataList( + currentDevice, chunkHeader.getMeasurementID()), + currentChunkHeaderOffset); + if (statistics != null + && ModsOperationUtil.isAllDeletedByMods( + currentDevice, + chunkHeader.getMeasurementID(), + statistics.getStartTime(), + statistics.getEndTime(), + currentModifications)) { + tsFileSequenceReader.position(nextMarkerOffset); + break; + } + } - chunkReader = - currentIsMultiPage - ? new ChunkReader( - new Chunk( - chunkHeader, - tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize())), - filter) - : new SinglePageWholeChunkReader( - new Chunk( - chunkHeader, - tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize()))); - currentIsAligned = false; - currentMeasurements.add( - new MeasurementSchema(chunkHeader.getMeasurementID(), chunkHeader.getDataType())); - return; + if (chunkHeader.getDataSize() > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { + PipeDataNodeResourceManager.memory() + .forceResize(allocatedMemoryBlockForChunk, chunkHeader.getDataSize()); + } + + Chunk chunk = + new Chunk( + chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize())); + + chunkReader = + currentIsMultiPage + ? new ChunkReader(chunk, filter) + : new SinglePageWholeChunkReader(chunk); + currentIsAligned = false; + currentMeasurements.add( + new MeasurementSchema(chunkHeader.getMeasurementID(), chunkHeader.getDataType())); + modsInfos.addAll( + ModsOperationUtil.initializeMeasurementMods( + currentDevice, + Collections.singletonList(chunkHeader.getMeasurementID()), + currentModifications)); + return; + } case MetaMarker.VALUE_CHUNK_HEADER: case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER: - if (Objects.isNull(firstChunkHeader4NextSequentialValueChunks)) { - chunkHeader = tsFileSequenceReader.readChunkHeader(marker); - - if (Objects.isNull(currentDevice) - || !treePattern.matchesMeasurement(currentDevice, chunkHeader.getMeasurementID())) { - tsFileSequenceReader.position( - tsFileSequenceReader.position() + chunkHeader.getDataSize()); - break; - } + { + if (Objects.isNull(firstChunkHeader4NextSequentialValueChunks)) { + long currentChunkHeaderOffset = tsFileSequenceReader.position() - 1; + chunkHeader = tsFileSequenceReader.readChunkHeader(marker); + + final long nextMarkerOffset = + tsFileSequenceReader.position() + chunkHeader.getDataSize(); + if (Objects.isNull(currentDevice) + || !treePattern.matchesMeasurement( + currentDevice, chunkHeader.getMeasurementID())) { + tsFileSequenceReader.position(nextMarkerOffset); + break; + } - // Increase value index - final int valueIndex = - measurementIndexMap.compute( - chunkHeader.getMeasurementID(), - (measurement, index) -> Objects.nonNull(index) ? index + 1 : 0); + if (!currentModifications.isEmpty()) { + // Skip the chunk if it is fully deleted by mods + final Statistics statistics = + findAlignedChunkStatistics( + tsFileSequenceReader.getIChunkMetadataList( + currentDevice, chunkHeader.getMeasurementID()), + currentChunkHeaderOffset); + if (statistics != null + && ModsOperationUtil.isAllDeletedByMods( + currentDevice, + chunkHeader.getMeasurementID(), + statistics.getStartTime(), + statistics.getEndTime(), + currentModifications)) { + tsFileSequenceReader.position(nextMarkerOffset); + break; + } + } - // Emit when encountered non-sequential value chunk, or the chunk size exceeds - // certain value to avoid OOM - // Do not record or end current value chunks when there are empty chunks - if (chunkHeader.getDataSize() == 0) { - break; - } - boolean needReturn = false; - final long timeChunkSize = - lastIndex >= 0 - ? PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(lastIndex)) - : 0; - if (lastIndex >= 0) { - if (valueIndex != lastIndex) { - needReturn = recordAlignedChunk(valueChunkList, marker); - } else { - final long chunkSize = timeChunkSize + valueChunkSize; - if (chunkSize + chunkHeader.getDataSize() - > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { - if (valueChunkList.size() == 1 - && chunkSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { - PipeDataNodeResourceManager.memory() - .forceResize(allocatedMemoryBlockForChunk, chunkSize); - } + // Increase value index + final int valueIndex = + measurementIndexMap.compute( + chunkHeader.getMeasurementID(), + (measurement, index) -> Objects.nonNull(index) ? index + 1 : 0); + + // Emit when encountered non-sequential value chunk, or the chunk size exceeds + // certain value to avoid OOM + // Do not record or end current value chunks when there are empty chunks + if (chunkHeader.getDataSize() == 0) { + break; + } + boolean needReturn = false; + final long timeChunkSize = + lastIndex >= 0 + ? PipeMemoryWeightUtil.calculateChunkRamBytesUsed( + timeChunkList.get(lastIndex)) + : 0; + if (lastIndex >= 0) { + if (valueIndex != lastIndex) { needReturn = recordAlignedChunk(valueChunkList, marker); + } else { + final long chunkSize = timeChunkSize + valueChunkSize; + if (chunkSize + chunkHeader.getDataSize() + > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { + if (valueChunkList.size() == 1 + && chunkSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { + PipeDataNodeResourceManager.memory() + .forceResize(allocatedMemoryBlockForChunk, chunkSize); + } + needReturn = recordAlignedChunk(valueChunkList, marker); + } } } + lastIndex = valueIndex; + if (needReturn) { + firstChunkHeader4NextSequentialValueChunks = chunkHeader; + return; + } + } else { + chunkHeader = firstChunkHeader4NextSequentialValueChunks; + firstChunkHeader4NextSequentialValueChunks = null; } - lastIndex = valueIndex; - if (needReturn) { - firstChunkHeader4NextSequentialValueChunks = chunkHeader; - return; - } - } else { - chunkHeader = firstChunkHeader4NextSequentialValueChunks; - firstChunkHeader4NextSequentialValueChunks = null; - } - valueChunkSize += chunkHeader.getDataSize(); - valueChunkList.add( - new Chunk( - chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize()))); - currentMeasurements.add( - new MeasurementSchema(chunkHeader.getMeasurementID(), chunkHeader.getDataType())); - break; + Chunk chunk = + new Chunk( + chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize())); + + valueChunkSize += chunkHeader.getDataSize(); + valueChunkList.add(chunk); + currentMeasurements.add( + new MeasurementSchema(chunkHeader.getMeasurementID(), chunkHeader.getDataType())); + modsInfos.addAll( + ModsOperationUtil.initializeMeasurementMods( + currentDevice, + Collections.singletonList(chunkHeader.getMeasurementID()), + currentModifications)); + break; + } case MetaMarker.CHUNK_GROUP_HEADER: - // Return before "currentDevice" changes - if (recordAlignedChunk(valueChunkList, marker)) { - return; + { + // Return before "currentDevice" changes + if (recordAlignedChunk(valueChunkList, marker)) { + return; + } + // Clear because the cached data will never be used in the next chunk group + lastIndex = -1; + timeChunkList.clear(); + isMultiPageList.clear(); + measurementIndexMap.clear(); + final IDeviceID deviceID = tsFileSequenceReader.readChunkGroupHeader().getDeviceID(); + currentDevice = treePattern.mayOverlapWithDevice(deviceID) ? deviceID : null; + break; } - // Clear because the cached data will never be used in the next chunk group - lastIndex = -1; - timeChunkList.clear(); - isMultiPageList.clear(); - measurementIndexMap.clear(); - final IDeviceID deviceID = tsFileSequenceReader.readChunkGroupHeader().getDeviceID(); - currentDevice = treePattern.mayOverlapWithDevice(deviceID) ? deviceID : null; - break; case MetaMarker.OPERATION_INDEX_RANGE: - tsFileSequenceReader.readPlanIndex(); - break; + { + tsFileSequenceReader.readPlanIndex(); + break; + } default: MetaMarker.handleUnexpectedMarker(marker); } @@ -569,4 +665,32 @@ public void close() { allocatedMemoryBlockForChunk.close(); } } + + private Statistics findAlignedChunkStatistics( + List metadataList, long currentChunkHeaderOffset) { + for (IChunkMetadata metadata : metadataList) { + if (!(metadata instanceof AlignedChunkMetadata)) { + continue; + } + List list = ((AlignedChunkMetadata) metadata).getValueChunkMetadataList(); + for (IChunkMetadata m : list) { + if (m.getOffsetOfChunkHeader() == currentChunkHeaderOffset) { + return m.getStatistics(); + } + } + break; + } + return null; + } + + private Statistics findNonAlignedChunkStatistics( + List metadataList, long currentChunkHeaderOffset) { + for (IChunkMetadata metadata : metadataList) { + if (metadata.getOffsetOfChunkHeader() == currentChunkHeaderOffset) { + // found the corresponding chunk metadata + return metadata.getStatistics(); + } + } + return null; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java index 7a5d869aa1a0..77a3a8916c9c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java @@ -28,9 +28,11 @@ import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser; +import org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; +import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeException; @@ -49,6 +51,7 @@ public class TsFileInsertionEventTableParser extends TsFileInsertionEventParser private final long endTime; private final TablePattern tablePattern; private final String userName; + private final boolean isWithMod; private final PipeMemoryBlock allocatedMemoryBlockForBatchData; private final PipeMemoryBlock allocatedMemoryBlockForChunk; @@ -64,11 +67,20 @@ public TsFileInsertionEventTableParser( final long endTime, final PipeTaskMeta pipeTaskMeta, final String userName, - final PipeInsertionEvent sourceEvent) + final PipeInsertionEvent sourceEvent, + final boolean isWithMod) throws IOException { super(pipeName, creationTime, null, pattern, startTime, endTime, pipeTaskMeta, sourceEvent); + this.isWithMod = isWithMod; try { + currentModifications = + isWithMod + ? ModsOperationUtil.loadModificationsFromTsFile(tsFile) + : PatternTreeMapFactory.getModsPatternTreeMap(); + allocatedMemoryBlockForModifications = + PipeDataNodeResourceManager.memory() + .forceAllocateForTabletWithRetry(currentModifications.ramBytesUsed()); long tableSize = Math.min( PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes(), @@ -106,9 +118,20 @@ public TsFileInsertionEventTableParser( final long endTime, final PipeTaskMeta pipeTaskMeta, final String userName, - final PipeInsertionEvent sourceEvent) + final PipeInsertionEvent sourceEvent, + final boolean isWithMod) throws IOException { - this(null, 0, tsFile, pattern, startTime, endTime, pipeTaskMeta, userName, sourceEvent); + this( + null, + 0, + tsFile, + pattern, + startTime, + endTime, + pipeTaskMeta, + userName, + sourceEvent, + isWithMod); } @Override @@ -136,6 +159,7 @@ && hasTablePrivilege(entry.getKey()), allocatedMemoryBlockForChunk, allocatedMemoryBlockForChunkMeta, allocatedMemoryBlockForTableSchemas, + currentModifications, startTime, endTime); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java index 746d9d5b4a01..f05cf872c798 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java @@ -19,9 +19,13 @@ package org.apache.iotdb.db.pipe.event.common.tsfile.parser.table; +import org.apache.iotdb.commons.path.PatternTreeMap; +import org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; +import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.tsfile.enums.ColumnCategory; @@ -75,6 +79,9 @@ public class TsFileInsertionEventTableParserTabletIterator implements Iterator modifications; + // Used to read tsfile data private IChunkReader chunkReader; private BatchData batchData; @@ -96,6 +103,8 @@ public class TsFileInsertionEventTableParserTabletIterator implements Iterator dataTypeList; private int deviceIdSize; + private List modsInfoList; + // Used to record whether the same Tablet is generated when parsing starts. Different table // information cannot be placed in the same Tablet. private boolean isSameTableName; @@ -109,12 +118,14 @@ public TsFileInsertionEventTableParserTabletIterator( final PipeMemoryBlock allocatedMemoryBlockForChunk, final PipeMemoryBlock allocatedMemoryBlockForChunkMeta, final PipeMemoryBlock allocatedMemoryBlockForTableSchema, + final PatternTreeMap modifications, final long startTime, final long endTime) throws IOException { this.startTime = startTime; this.endTime = endTime; + this.modifications = modifications; this.reader = tsFileSequenceReader; this.metadataQuerier = new MetadataQuerierByFileImpl(reader); @@ -202,6 +213,31 @@ public boolean hasNext() { continue; } + Iterator iChunkMetadataIterator = + alignedChunkMetadata.getValueChunkMetadataList().iterator(); + while (iChunkMetadataIterator.hasNext()) { + IChunkMetadata iChunkMetadata = iChunkMetadataIterator.next(); + if (iChunkMetadata == null) { + iChunkMetadataIterator.remove(); + continue; + } + + if (!modifications.isEmpty() + && ModsOperationUtil.isAllDeletedByMods( + pair.getLeft(), + iChunkMetadata.getMeasurementUid(), + alignedChunkMetadata.getStartTime(), + alignedChunkMetadata.getEndTime(), + modifications)) { + iChunkMetadataIterator.remove(); + } + } + + if (alignedChunkMetadata.getValueChunkMetadataList().isEmpty()) { + chunkMetadataIterator.remove(); + continue; + } + size += PipeMemoryWeightUtil.calculateAlignedChunkMetaBytesUsed(alignedChunkMetadata); if (allocatedMemoryBlockForChunkMeta.getMemoryUsageInBytes() < size) { @@ -307,9 +343,10 @@ private Tablet buildNextTablet() { break; } - tablet.addTimestamp(rowIndex, batchData.currentTime()); - fillMeasurementValueColumns(batchData, tablet, rowIndex); - fillDeviceIdColumns(deviceID, tablet, rowIndex); + if (fillMeasurementValueColumns(batchData, tablet, rowIndex)) { + fillDeviceIdColumns(deviceID, tablet, rowIndex); + tablet.addTimestamp(rowIndex, batchData.currentTime()); + } } if (batchData != null) { @@ -386,15 +423,19 @@ private void initChunkReader(final AbstractAlignedChunkMetadata alignedChunkMeta } this.chunkReader = new TableChunkReader(timeChunk, valueChunkList, null); + this.modsInfoList = + ModsOperationUtil.initializeMeasurementMods(deviceID, measurementList, modifications); } - private void fillMeasurementValueColumns( + private boolean fillMeasurementValueColumns( final BatchData data, final Tablet tablet, final int rowIndex) { final TsPrimitiveType[] primitiveTypes = data.getVector(); + boolean needFillTime = false; for (int i = deviceIdSize, size = dataTypeList.size(); i < size; i++) { final TsPrimitiveType primitiveType = primitiveTypes[i - deviceIdSize]; - if (primitiveType == null) { + if (primitiveType == null + || ModsOperationUtil.isDelete(data.currentTime(), modsInfoList.get(i))) { switch (dataTypeList.get(i)) { case TEXT: case BLOB: @@ -404,6 +445,7 @@ private void fillMeasurementValueColumns( tablet.getBitMaps()[i].mark(rowIndex); continue; } + needFillTime = true; switch (dataTypeList.get(i)) { case BOOLEAN: @@ -438,6 +480,7 @@ private void fillMeasurementValueColumns( throw new UnSupportedDataTypeException("UnSupported" + primitiveType.getDataType()); } } + return needFillTime; } private void fillDeviceIdColumns( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtil.java new file mode 100644 index 000000000000..66fed43feade --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtil.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.event.common.tsfile.parser.util; + +import org.apache.iotdb.commons.path.PatternTreeMap; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; +import org.apache.iotdb.db.utils.ModificationUtils; +import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; +import org.apache.iotdb.pipe.api.exception.PipeException; + +import org.apache.tsfile.file.metadata.IDeviceID; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Utility class for handling mods operations during TsFile parsing. Supports mods processing logic + * for both tree model and table model. + */ +public class ModsOperationUtil { + + private ModsOperationUtil() { + // Utility class, no instantiation allowed + } + + /** + * Load all modifications from TsFile and build PatternTreeMap + * + * @param tsFile TsFile file + * @return PatternTreeMap containing all modifications + */ + public static PatternTreeMap + loadModificationsFromTsFile(File tsFile) { + PatternTreeMap modifications = + PatternTreeMapFactory.getModsPatternTreeMap(); + + try { + ModificationFile.readAllModifications(tsFile, true) + .forEach( + modification -> modifications.append(modification.keyOfPatternTree(), modification)); + } catch (Exception e) { + throw new PipeException("Failed to load modifications from TsFile: " + tsFile.getPath(), e); + } + + return modifications; + } + + /** + * Check if data in the specified time range is completely deleted by mods Different logic for + * tree model and table model + * + * @param deviceID device ID + * @param measurementID measurement ID + * @param startTime start time + * @param endTime end time + * @param modifications modification records + * @return true if data is completely deleted, false otherwise + */ + public static boolean isAllDeletedByMods( + IDeviceID deviceID, + String measurementID, + long startTime, + long endTime, + PatternTreeMap modifications) { + if (modifications == null) { + return false; + } + + final List mods = modifications.getOverlapped(deviceID, measurementID); + if (mods == null || mods.isEmpty()) { + return false; + } + + // Different logic for tree model and table model + if (deviceID.isTableModel()) { + // For table model: check if any modification affects the device and covers the time range + return mods.stream() + .anyMatch( + modification -> + modification.getTimeRange().contains(startTime, endTime) + && modification.affects(deviceID) + && modification.affects(measurementID)); + } else { + // For tree model: check if any modification covers the time range + return mods.stream() + .anyMatch(modification -> modification.getTimeRange().contains(startTime, endTime)); + } + } + + /** + * Initialize mods mapping for specified measurement list + * + * @param deviceID device ID + * @param measurements measurement list + * @param modifications modification records + * @return mapping from measurement ID to mods list and index + */ + public static List initializeMeasurementMods( + IDeviceID deviceID, + List measurements, + PatternTreeMap modifications) { + + List modsInfos = new ArrayList<>(measurements.size()); + + for (final String measurement : measurements) { + final List mods = modifications.getOverlapped(deviceID, measurement); + if (mods == null || mods.isEmpty()) { + // No mods, use empty list and index 0 + modsInfos.add(new ModsInfo(Collections.emptyList(), 0)); + continue; + } + + // Sort by time range for efficient lookup + // Different filtering logic for tree model and table model + final List filteredMods; + if (deviceID.isTableModel()) { + // For table model: filter modifications that affect the device + filteredMods = + mods.stream() + .filter( + modification -> + modification.affects(deviceID) && modification.affects(measurement)) + .collect(Collectors.toList()); + } else { + // For tree model: no additional filtering needed + filteredMods = mods; + } + // Store sorted mods and start index + modsInfos.add(new ModsInfo(ModificationUtils.sortAndMerge(filteredMods), 0)); + } + + return modsInfos; + } + + /** + * Check if data at the specified time point is deleted + * + * @param time time point + * @param modsInfo mods information containing mods list and current index + * @return true if data is deleted, false otherwise + */ + public static boolean isDelete(long time, ModsInfo modsInfo) { + if (modsInfo == null) { + return false; + } + + final List mods = modsInfo.getMods(); + if (mods == null || mods.isEmpty()) { + return false; + } + + int currentIndex = modsInfo.getCurrentIndex(); + if (currentIndex < 0) { + return false; + } + + // First, try to use the current index if it's valid + if (currentIndex < mods.size()) { + final ModEntry currentMod = mods.get(currentIndex); + final long currentModStartTime = currentMod.getTimeRange().getMin(); + final long currentModEndTime = currentMod.getTimeRange().getMax(); + + if (time < currentModStartTime) { + // Time is before current mod, return false + return false; + } else if (time <= currentModEndTime) { + // Time is within current mod range, return true + return true; + } else { + // Time is after current mod, need to search forwards + return searchAndCheckMod(mods, time, currentIndex + 1, modsInfo); + } + } else { + // Current index is beyond array bounds, all mods have been processed + clearModsAndReset(modsInfo); + return false; + } + } + + /** + * Search for a mod using binary search and check if the time point is deleted + * + * @param mods sorted list of mods + * @param time time point to search for + * @param startIndex starting index for search + * @param modsInfo mods information to update + * @return true if data is deleted, false otherwise + */ + private static boolean searchAndCheckMod( + List mods, long time, int startIndex, ModsInfo modsInfo) { + int searchIndex = binarySearchMods(mods, time, startIndex); + if (searchIndex >= mods.size()) { + // All mods checked, clear mods list and reset index to 0 + clearModsAndReset(modsInfo); + return false; + } + + final ModEntry foundMod = mods.get(searchIndex); + final long foundModStartTime = foundMod.getTimeRange().getMin(); + + if (time < foundModStartTime) { + modsInfo.setCurrentIndex(searchIndex); + return false; + } + + modsInfo.setCurrentIndex(searchIndex); + return true; + } + + /** + * Clear mods list and reset index to 0 + * + * @param modsInfo mods information to update + */ + private static void clearModsAndReset(ModsInfo modsInfo) { + modsInfo.setMods(Collections.emptyList()); + modsInfo.setCurrentIndex(0); + } + + /** + * Binary search to find the first mod that might contain the given time point. Returns the index + * of the first mod where modStartTime <= time, or mods.size() if no such mod exists. + * + * @param mods sorted list of mods + * @param time time point to search for + * @param startIndex starting index for search (current index) + * @return index of the first potential mod, or mods.size() if none found + */ + private static int binarySearchMods(List mods, long time, int startIndex) { + int left = startIndex; + int right = mods.size(); + + while (left < right) { + int mid = left + (right - left) / 2; + final long max = mods.get(mid).getTimeRange().getMax(); + + if (max < time) { + left = mid + 1; + } else { + right = mid; + } + } + + return left; + } + + /** Mods information wrapper class, containing mods list and current index */ + public static class ModsInfo { + private List mods; + private int currentIndex; + + public ModsInfo(List mods, int currentIndex) { + this.mods = Objects.requireNonNull(mods); + this.currentIndex = currentIndex; + } + + public List getMods() { + return mods; + } + + public void setMods(List newMods) { + this.mods = newMods; + } + + public int getCurrentIndex() { + return currentIndex; + } + + public void setCurrentIndex(int newIndex) { + this.currentIndex = newIndex; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ModsInfo modsInfo = (ModsInfo) o; + return Objects.equals(mods, modsInfo.mods) + && Objects.equals(currentIndex, modsInfo.currentIndex); + } + + @Override + public int hashCode() { + return Objects.hash(mods, currentIndex); + } + + @Override + public String toString() { + return "ModsInfo{" + "mods=" + mods + ", currentIndex=" + currentIndex + '}'; + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java index 5fb87e550dfc..2ddcdd27f62d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java @@ -133,7 +133,8 @@ public Optional visitLoadFile( Long.MAX_VALUE, null, "root", - null)) { + null, + true)) { for (final TabletInsertionEvent tabletInsertionEvent : parser.toTabletInsertionEvents()) { if (!(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) { continue; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java index 77bbf49094c3..8541e9bc9d02 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java @@ -102,7 +102,7 @@ public Optional visitLoadFile( for (final File file : loadTsFileStatement.getTsFiles()) { try (final TsFileInsertionEventScanParser parser = new TsFileInsertionEventScanParser( - file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) { + file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null, true)) { for (final Pair tabletWithIsAligned : parser.toTabletWithIsAligneds()) { final PipeConvertedInsertTabletStatement statement = new PipeConvertedInsertTabletStatement( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java index 667a2faf0de7..eca63e40356e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java @@ -83,7 +83,8 @@ public Optional visitLoadTsFile( Long.MAX_VALUE, null, "root", - null)) { + null, + true)) { for (final TabletInsertionEvent tabletInsertionEvent : parser.toTabletInsertionEvents()) { if (!(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) { continue; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java index 6dd851d1d9d9..d3ad0ae0a34b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java @@ -93,7 +93,13 @@ public Optional visitLoadFile( for (final File file : loadTsFileStatement.getTsFiles()) { try (final TsFileInsertionEventScanParser parser = new TsFileInsertionEventScanParser( - file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) { + file, + new IoTDBTreePattern(null), + Long.MIN_VALUE, + Long.MAX_VALUE, + null, + null, + true)) { for (final Pair tabletWithIsAligned : parser.toTabletWithIsAligneds()) { final PipeTransferTabletRawReq tabletRawReq = PipeTransferTabletRawReq.toTPipeTransferRawReq( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java index 9c58ae9f0f68..7bfde3b158d5 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java @@ -584,7 +584,7 @@ private void testTsFilePointNum( ? new TsFileInsertionEventQueryParser( tsFile, pattern, startTime, endTime, tsFileInsertionEvent) : new TsFileInsertionEventScanParser( - tsFile, pattern, startTime, endTime, null, tsFileInsertionEvent)) { + tsFile, pattern, startTime, endTime, null, tsFileInsertionEvent, false)) { final AtomicInteger count1 = new AtomicInteger(0); final AtomicInteger count2 = new AtomicInteger(0); final AtomicInteger count3 = new AtomicInteger(0); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtilTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtilTest.java new file mode 100644 index 000000000000..7eb09bbab41a --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtilTest.java @@ -0,0 +1,408 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.event.common.tsfile.parser.util; + +import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry; + +import org.apache.tsfile.read.common.TimeRange; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ModsOperationUtilTest { + + @Test + public void testIsDeleteWithBinarySearch() { + // Create test mods with time ranges: [10, 20], [30, 40], [50, 60], [70, 80] + List mods = createTestMods(new long[][] {{10, 20}, {30, 40}, {50, 60}, {70, 80}}); + + ModsOperationUtil.ModsInfo modsInfo = new ModsOperationUtil.ModsInfo(mods, 0); + + // Test cases + // Time 5: before first mod, should return false + assertFalse(ModsOperationUtil.isDelete(5, modsInfo)); + assertEquals(0, modsInfo.getCurrentIndex()); + + // Time 15: within first mod [10, 20], should return true + assertTrue(ModsOperationUtil.isDelete(15, modsInfo)); + assertEquals(0, modsInfo.getCurrentIndex()); + + // Time 25: between first and second mod, should return false + assertFalse(ModsOperationUtil.isDelete(25, modsInfo)); + assertEquals(1, modsInfo.getCurrentIndex()); + + // Time 35: within second mod [30, 40], should return true + assertTrue(ModsOperationUtil.isDelete(35, modsInfo)); + assertEquals(1, modsInfo.getCurrentIndex()); + + // Time 45: between second and third mod, should return false + assertFalse(ModsOperationUtil.isDelete(45, modsInfo)); + assertEquals(2, modsInfo.getCurrentIndex()); + + // Time 55: within third mod [50, 60], should return true + assertTrue(ModsOperationUtil.isDelete(55, modsInfo)); + assertEquals(2, modsInfo.getCurrentIndex()); + + // Time 65: between third and fourth mod, should return false + assertFalse(ModsOperationUtil.isDelete(65, modsInfo)); + assertEquals(3, modsInfo.getCurrentIndex()); + + // Time 75: within fourth mod [70, 80], should return true + assertTrue(ModsOperationUtil.isDelete(75, modsInfo)); + assertEquals(3, modsInfo.getCurrentIndex()); + + // Time 85: after last mod, should return false and clear mods + assertFalse(ModsOperationUtil.isDelete(85, modsInfo)); + assertTrue(modsInfo.getMods().isEmpty()); + assertEquals(0, modsInfo.getCurrentIndex()); + } + + @Test + public void testIsDeleteWithEmptyMods() { + ModsOperationUtil.ModsInfo modsInfo = new ModsOperationUtil.ModsInfo(new ArrayList<>(), 0); + + // Should return false for any time when mods is empty + assertFalse(ModsOperationUtil.isDelete(100, modsInfo)); + } + + @Test + public void testIsDeleteWithNullModsInfo() { + // Should return false when modsInfo is null + assertFalse(ModsOperationUtil.isDelete(100, null)); + } + + @Test + public void testIsDeleteWithNegativeIndex() { + List mods = createTestMods(new long[][] {{10, 20}}); + ModsOperationUtil.ModsInfo modsInfo = new ModsOperationUtil.ModsInfo(mods, -1); + + // Should return false when currentIndex is negative + assertFalse(ModsOperationUtil.isDelete(15, modsInfo)); + } + + @Test + public void testIsDeleteWithSingleMod() { + List mods = createTestMods(new long[][] {{10, 20}}); + ModsOperationUtil.ModsInfo modsInfo = new ModsOperationUtil.ModsInfo(mods, 0); + + // Time before mod + assertFalse(ModsOperationUtil.isDelete(5, modsInfo)); + assertEquals(0, modsInfo.getCurrentIndex()); + + // Time within mod + assertTrue(ModsOperationUtil.isDelete(15, modsInfo)); + assertEquals(0, modsInfo.getCurrentIndex()); + + // Time after mod + assertFalse(ModsOperationUtil.isDelete(25, modsInfo)); + assertTrue(modsInfo.getMods().isEmpty()); + assertEquals(0, modsInfo.getCurrentIndex()); + } + + @Test + public void testIsDeleteWithOverlappingMods() { + // Create overlapping mods: [10, 30], [20, 40], [30, 50] + List mods = createTestMods(new long[][] {{10, 30}, {20, 40}, {30, 50}}); + + ModsOperationUtil.ModsInfo modsInfo = new ModsOperationUtil.ModsInfo(mods, 0); + + // Time 15: within first mod + assertTrue(ModsOperationUtil.isDelete(15, modsInfo)); + assertEquals(0, modsInfo.getCurrentIndex()); + + // Time 25: within both first and second mod, should find first one + assertTrue(ModsOperationUtil.isDelete(25, modsInfo)); + assertEquals(0, modsInfo.getCurrentIndex()); + + // Time 35: within second and third mod, should find second one + assertTrue(ModsOperationUtil.isDelete(35, modsInfo)); + assertEquals(1, modsInfo.getCurrentIndex()); + + // Time 45: within third mod + assertTrue(ModsOperationUtil.isDelete(45, modsInfo)); + assertEquals(2, modsInfo.getCurrentIndex()); + } + + @Test + public void testBinarySearchMods() { + // Create test mods with time ranges: [10, 20], [30, 40], [50, 60], [70, 80] + List mods = createTestMods(new long[][] {{10, 20}, {30, 40}, {50, 60}, {70, 80}}); + + // Test binary search from start index 0 + // Time 5: before all mods, should return 0 (first mod) + assertEquals(0, binarySearchMods(mods, 5, 0)); + + // Time 15: within first mod [10, 20], should return 0 + assertEquals(0, binarySearchMods(mods, 15, 0)); + + // Time 25: between first and second mod, should return 1 + assertEquals(1, binarySearchMods(mods, 25, 0)); + + // Time 35: within second mod [30, 40], should return 1 + assertEquals(1, binarySearchMods(mods, 35, 0)); + + // Time 45: between second and third mod, should return 2 + assertEquals(2, binarySearchMods(mods, 45, 0)); + + // Time 55: within third mod [50, 60], should return 2 + assertEquals(2, binarySearchMods(mods, 55, 0)); + + // Time 65: between third and fourth mod, should return 3 + assertEquals(3, binarySearchMods(mods, 65, 0)); + + // Time 75: within fourth mod [70, 80], should return 3 + assertEquals(3, binarySearchMods(mods, 75, 0)); + + // Time 85: after all mods, should return 4 (mods.size()) + assertEquals(4, binarySearchMods(mods, 85, 0)); + } + + @Test + public void testBinarySearchModsWithStartIndex() { + // Create test mods with time ranges: [10, 20], [30, 40], [50, 60], [70, 80] + List mods = createTestMods(new long[][] {{10, 20}, {30, 40}, {50, 60}, {70, 80}}); + + // Test binary search starting from index 2 + // Time 15: before start index, should return 2 (start index) + assertEquals(2, binarySearchMods(mods, 15, 2)); + + // Time 35: before start index, should return 2 (start index) + assertEquals(2, binarySearchMods(mods, 35, 2)); + + // Time 55: within mod at index 2, should return 2 + assertEquals(2, binarySearchMods(mods, 55, 2)); + + // Time 65: between mods at index 2 and 3, should return 3 + assertEquals(3, binarySearchMods(mods, 65, 2)); + + // Time 75: within mod at index 3, should return 3 + assertEquals(3, binarySearchMods(mods, 75, 2)); + + // Time 85: after all mods, should return 4 (mods.size()) + assertEquals(4, binarySearchMods(mods, 85, 2)); + } + + @Test + public void testBinarySearchModsWithOverlappingRanges() { + // Create overlapping mods: [10, 30], [20, 40], [30, 50] + List mods = createTestMods(new long[][] {{10, 30}, {20, 40}, {30, 50}}); + + // Time 15: within first mod, should return 0 + assertEquals(0, binarySearchMods(mods, 15, 0)); + + // Time 25: within first and second mod, should return 0 (first match) + assertEquals(0, binarySearchMods(mods, 25, 0)); + + // Time 35: within second and third mod, should return 1 (first match from start) + assertEquals(1, binarySearchMods(mods, 35, 0)); + + // Time 45: within third mod, should return 2 + assertEquals(2, binarySearchMods(mods, 45, 0)); + } + + @Test + public void testBinarySearchModsWithEmptyList() { + List mods = new ArrayList<>(); + + // Should return 0 for any time when mods is empty + assertEquals(0, binarySearchMods(mods, 100, 0)); + } + + @Test + public void testBinarySearchModsWithSingleMod() { + List mods = createTestMods(new long[][] {{10, 20}}); + + // Time 5: before mod, should return 0 + assertEquals(0, binarySearchMods(mods, 5, 0)); + + // Time 15: within mod, should return 0 + assertEquals(0, binarySearchMods(mods, 15, 0)); + + // Time 25: after mod, should return 1 + assertEquals(1, binarySearchMods(mods, 25, 0)); + } + + @Test + public void testBinarySearchModsWithExactBoundaries() { + // Create mods with exact boundaries: [10, 20], [20, 30], [30, 40] + List mods = createTestMods(new long[][] {{10, 20}, {20, 30}, {30, 40}}); + + // Time 20: at boundary, first mod [10, 20] has endTime=20 >= 20, should return 0 + assertEquals(0, binarySearchMods(mods, 20, 0)); + + // Time 30: at boundary, second mod [20, 30] has endTime=30 >= 30, should return 1 + assertEquals(1, binarySearchMods(mods, 30, 0)); + } + + @Test + public void testBinarySearchModsWithMinBoundaries() { + // Create mods: [10, 20], [30, 40], [50, 60] + List mods = createTestMods(new long[][] {{10, 20}, {30, 40}, {50, 60}}); + + // Time 10: exactly at first mod's min, should return 0 + assertEquals(0, binarySearchMods(mods, 10, 0)); + + // Time 30: exactly at second mod's min, should return 1 + assertEquals(1, binarySearchMods(mods, 30, 0)); + + // Time 50: exactly at third mod's min, should return 2 + assertEquals(2, binarySearchMods(mods, 50, 0)); + } + + @Test + public void testBinarySearchModsWithMaxBoundaries() { + // Create mods: [10, 20], [30, 40], [50, 60] + List mods = createTestMods(new long[][] {{10, 20}, {30, 40}, {50, 60}}); + + // Time 20: exactly at first mod's max, should return 0 + assertEquals(0, binarySearchMods(mods, 20, 0)); + + // Time 40: exactly at second mod's max, should return 1 + assertEquals(1, binarySearchMods(mods, 40, 0)); + + // Time 60: exactly at third mod's max, should return 2 + assertEquals(2, binarySearchMods(mods, 60, 0)); + } + + @Test + public void testBinarySearchModsWithJustBeforeMin() { + // Create mods: [10, 20], [30, 40], [50, 60] + List mods = createTestMods(new long[][] {{10, 20}, {30, 40}, {50, 60}}); + + // Time 9: just before first mod's min, should return 0 (first mod) + assertEquals(0, binarySearchMods(mods, 9, 0)); + + // Time 29: just before second mod's min, should return 1 (second mod) + assertEquals(1, binarySearchMods(mods, 29, 0)); + + // Time 49: just before third mod's min, should return 2 (third mod) + assertEquals(2, binarySearchMods(mods, 49, 0)); + } + + @Test + public void testBinarySearchModsWithJustAfterMax() { + // Create mods: [10, 20], [30, 40], [50, 60] + List mods = createTestMods(new long[][] {{10, 20}, {30, 40}, {50, 60}}); + + // Time 21: just after first mod's max, should return 1 (second mod) + assertEquals(1, binarySearchMods(mods, 21, 0)); + + // Time 41: just after second mod's max, should return 2 (third mod) + assertEquals(2, binarySearchMods(mods, 41, 0)); + + // Time 61: just after third mod's max, should return 3 (mods.size()) + assertEquals(3, binarySearchMods(mods, 61, 0)); + } + + @Test + public void testBinarySearchModsWithLargeGaps() { + // Create mods with large gaps: [10, 20], [100, 200], [1000, 2000] + List mods = createTestMods(new long[][] {{10, 20}, {100, 200}, {1000, 2000}}); + + // Time 50: in large gap, should return 1 (second mod) + assertEquals(1, binarySearchMods(mods, 50, 0)); + + // Time 500: in large gap, should return 2 (third mod) + assertEquals(2, binarySearchMods(mods, 500, 0)); + + // Time 5000: after all mods, should return 3 (mods.size()) + assertEquals(3, binarySearchMods(mods, 5000, 0)); + } + + @Test + public void testBinarySearchModsWithNegativeTime() { + // Create mods: [10, 20], [30, 40] + List mods = createTestMods(new long[][] {{10, 20}, {30, 40}}); + + // Time -10: negative time, should return 0 (first mod) + assertEquals(0, binarySearchMods(mods, -10, 0)); + + // Time 0: zero time, should return 0 (first mod) + assertEquals(0, binarySearchMods(mods, 0, 0)); + } + + @Test + public void testBinarySearchModsWithVeryLargeTime() { + // Create mods: [10, 20], [30, 40] + List mods = createTestMods(new long[][] {{10, 20}, {30, 40}}); + + // Time Long.MAX_VALUE: very large time, should return 2 (mods.size()) + assertEquals(2, binarySearchMods(mods, Long.MAX_VALUE, 0)); + + // Time Long.MIN_VALUE: very small time, should return 0 (first mod) + assertEquals(0, binarySearchMods(mods, Long.MIN_VALUE, 0)); + } + + @Test + public void testBinarySearchModsWithDuplicateTimeRanges() { + // Create mods with duplicate time ranges: [10, 20], [10, 20], [30, 40] + List mods = createTestMods(new long[][] {{10, 20}, {10, 20}, {30, 40}}); + + // Time 15: within duplicate ranges, should return 0 (first match) + assertEquals(0, binarySearchMods(mods, 15, 0)); + + // Time 20: at max of duplicate ranges, should return 0 (first match) + assertEquals(0, binarySearchMods(mods, 20, 0)); + } + + @Test + public void testBinarySearchModsWithStartIndexAtEnd() { + // Create mods: [10, 20], [30, 40], [50, 60] + List mods = createTestMods(new long[][] {{10, 20}, {30, 40}, {50, 60}}); + + // Start from index 3 (beyond array), should return 3 + assertEquals(3, binarySearchMods(mods, 100, 3)); + + // Start from index 2, search for time 55, should return 2 + assertEquals(2, binarySearchMods(mods, 55, 2)); + + // Start from index 2, search for time 25, should return 2 (start index) + assertEquals(2, binarySearchMods(mods, 25, 2)); + } + + // Helper method to access the private binarySearchMods method for testing + private int binarySearchMods(List mods, long time, int startIndex) { + // Use reflection to access the private method + try { + java.lang.reflect.Method method = + ModsOperationUtil.class.getDeclaredMethod( + "binarySearchMods", List.class, long.class, int.class); + method.setAccessible(true); + return (Integer) method.invoke(null, mods, time, startIndex); + } catch (Exception e) { + throw new RuntimeException("Failed to invoke binarySearchMods method", e); + } + } + + private List createTestMods(long[][] timeRanges) { + List mods = new ArrayList<>(); + for (long[] range : timeRanges) { + TreeDeletionEntry mod = new TreeDeletionEntry(null, new TimeRange(range[0], range[1])); + mods.add(mod); + } + return mods; + } +}