diff --git a/lsm/pom.xml b/lsm/pom.xml new file mode 100644 index 000000000000..081c60b0abc7 --- /dev/null +++ b/lsm/pom.xml @@ -0,0 +1,28 @@ + + + + + iotdb-parent + org.apache.iotdb + 0.14.0-SNAPSHOT + ../pom.xml + + 4.0.0 + iotdb-lsm + IoTDB lsm + diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/context/Context.java b/lsm/src/main/java/org/apache/iotdb/lsm/context/Context.java new file mode 100644 index 000000000000..3f369b301773 --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/context/Context.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.context; + +import org.apache.iotdb.lsm.strategy.AccessStrategy; +import org.apache.iotdb.lsm.strategy.PreOrderAccessStrategy; + +public class Context { + + // 类型 + ContextType type; + + // 访问策略 + AccessStrategy accessStrategy; + + // 所处的树深度 + int level; + + // 多少个线程处理该节点的子节点 + int threadNums; + + // 上界,大于该值的层级不会被处理 + int levelUpperBound; + + // 返回值 + Object result; + + // 是否正在recover + boolean recover; + + public Context() { + accessStrategy = new PreOrderAccessStrategy(); + type = ContextType.NONE; + level = 0; + threadNums = 1; + levelUpperBound = Integer.MAX_VALUE; + recover = false; + } + + public void setLevel(int level) { + this.level = level; + } + + public int getLevel() { + return level; + } + + public ContextType getType() { + return type; + } + + public int getThreadNums() { + return threadNums; + } + + public void setType(ContextType type) { + this.type = type; + } + + public void setThreadNums(int threadNums) { + this.threadNums = threadNums; + } + + public Object getResult() { + return result; + } + + public void setResult(Object result) { + this.result = result; + } + + public AccessStrategy getAccessStrategy() { + return accessStrategy; + } + + public void setAccessStrategy(AccessStrategy accessStrategy) { + this.accessStrategy = accessStrategy; + } + + public int getLevelUpperBound() { + return levelUpperBound; + } + + public void setLevelUpperBound(int levelUpperBound) { + this.levelUpperBound = levelUpperBound; + } + + public boolean isRecover() { + return recover; + } + + public void setRecover(boolean recover) { + this.recover = recover; + } +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/context/ContextType.java b/lsm/src/main/java/org/apache/iotdb/lsm/context/ContextType.java new file mode 100644 index 000000000000..3ff2e3509afb --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/context/ContextType.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.context; + +public enum ContextType { + NONE, + INSERT, + QUERY, + DELETE, + FLUSH; +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/context/DeleteContext.java b/lsm/src/main/java/org/apache/iotdb/lsm/context/DeleteContext.java new file mode 100644 index 000000000000..9b4b30ac41a7 --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/context/DeleteContext.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.context; + +import org.apache.iotdb.lsm.strategy.PostOrderAccessStrategy; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class DeleteContext extends Context { + + List keys; + + Object value; + + public DeleteContext() { + super(); + type = ContextType.DELETE; + accessStrategy = new PostOrderAccessStrategy(); + } + + public DeleteContext(Object value, Object... ks) { + super(); + this.value = value; + keys = new ArrayList<>(); + keys.addAll(Arrays.asList(ks)); + type = ContextType.DELETE; + accessStrategy = new PostOrderAccessStrategy(); + } + + public void setKeys(List keys) { + this.keys = keys; + } + + public void setValue(Object value) { + this.value = value; + } + + public Object getKey() { + return keys.get(level); + } + + public List getKeys() { + return keys; + } + + public Object getValue() { + return value; + } + + public int size() { + return keys.size(); + } +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/context/FlushContext.java b/lsm/src/main/java/org/apache/iotdb/lsm/context/FlushContext.java new file mode 100644 index 000000000000..2fd266396c8f --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/context/FlushContext.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.context; + +import org.apache.iotdb.lsm.strategy.RBFSAccessStrategy; + +public class FlushContext extends Context { + public FlushContext() { + accessStrategy = new RBFSAccessStrategy(); + } +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/context/InsertContext.java b/lsm/src/main/java/org/apache/iotdb/lsm/context/InsertContext.java new file mode 100644 index 000000000000..b09d1540cbc4 --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/context/InsertContext.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.context; + +import org.apache.iotdb.lsm.strategy.PreOrderAccessStrategy; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class InsertContext extends Context { + + List keys; + + Object value; + + public InsertContext() { + super(); + type = ContextType.INSERT; + accessStrategy = new PreOrderAccessStrategy(); + } + + public InsertContext(Object value, Object... keys) { + super(); + this.value = value; + this.keys = new ArrayList<>(); + this.keys.addAll(Arrays.asList(keys)); + type = ContextType.INSERT; + accessStrategy = new PreOrderAccessStrategy(); + } + + public Object getKey() { + return keys.get(level); + } + + public void setKeys(List keys) { + this.keys = keys; + } + + public void setValue(Object value) { + this.value = value; + } + + public List getKeys() { + return keys; + } + + public Object getValue() { + return value; + } +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/context/QueryContext.java b/lsm/src/main/java/org/apache/iotdb/lsm/context/QueryContext.java new file mode 100644 index 000000000000..7221b643f7fa --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/context/QueryContext.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.context; + +import org.apache.iotdb.lsm.strategy.PostOrderAccessStrategy; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class QueryContext extends Context { + + List keys; + + public QueryContext(Object... ks) { + super(); + keys = new ArrayList<>(); + keys.addAll(Arrays.asList(ks)); + type = ContextType.QUERY; + accessStrategy = new PostOrderAccessStrategy(); + } + + public Object getKey() { + return keys.get(level); + } + + public int size() { + return keys.size(); + } +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/example/Main.java b/lsm/src/main/java/org/apache/iotdb/lsm/example/Main.java new file mode 100644 index 000000000000..7dbe793ac5fc --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/example/Main.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.example; + +import org.apache.iotdb.lsm.context.FlushContext; +import org.apache.iotdb.lsm.context.InsertContext; +import org.apache.iotdb.lsm.levelProcess.FlushLevelProcess; +import org.apache.iotdb.lsm.levelProcess.InsertLevelProcess; +import org.apache.iotdb.lsm.manager.BasicLsmManager; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class Main { + public static void main(String[] args) throws Exception { + MemTableManager memTableManager = new MemTableManager(); + System.out.println("-------------insert--------------"); + insertionExample(memTableManager); + System.out.println("-------------flush--------------"); + flushExample(memTableManager); + } + + public static void insertionExample(MemTableManager memTableManager) throws Exception { + + BasicLsmManager baseLsmManager = + new BasicLsmManager(); + baseLsmManager + .nextLevel( + new InsertLevelProcess() { + @Override + public List getChildren(MemTableManager memNode, InsertContext context) { + Integer deviceID = (Integer) context.getValue(); + int maxDeviceID = memNode.getMaxDeviceID(); + List children = new ArrayList<>(); + if (deviceID / 65536 == maxDeviceID / 65536) { + children.add(memNode.getWorking()); + } else { + children.add(memNode.getImmutables().get(deviceID / 65536)); + } + return children; + } + + @Override + public void insert(MemTableManager memNode, InsertContext context) { + Integer deviceID = (Integer) context.getValue(); + int maxDeviceID = memNode.getMaxDeviceID(); + if (deviceID / 65536 == maxDeviceID / 65536) { + if (memNode.getWorking() == null) { + memNode.setWorking(new MemTable()); + } + } else if (deviceID > maxDeviceID) { + memNode + .getImmutables() + .put(memNode.getMaxDeviceID() / 65536, memNode.getWorking()); + memNode.setWorking(new MemTable()); + } + if (deviceID > maxDeviceID) { + memNode.setMaxDeviceID(deviceID); + } + } + }) + .nextLevel( + new InsertLevelProcess() { + @Override + public List getChildren(MemTable memNode, InsertContext context) { + String key = (String) context.getKey(); + List children = new ArrayList<>(); + children.add(memNode.getMap().get(key)); + return children; + } + + @Override + public void insert(MemTable memNode, InsertContext context) { + String key = (String) context.getKey(); + Map map = memNode.getMap(); + if (map.containsKey(key)) return; + map.put(key, new MemGroup()); + } + }) + .nextLevel( + new InsertLevelProcess() { + @Override + public List getChildren(MemGroup memNode, InsertContext context) { + String key = (String) context.getKey(); + List children = new ArrayList<>(); + children.add(memNode.getMap().get(key)); + return children; + } + + @Override + public void insert(MemGroup memNode, InsertContext context) { + String key = (String) context.getKey(); + Map map = memNode.getMap(); + if (map.containsKey(key)) return; + map.put(key, new MemChunk()); + } + }) + .nextLevel( + new InsertLevelProcess() { + @Override + public List getChildren(MemChunk memNode, InsertContext context) { + return null; + } + + @Override + public void insert(MemChunk memNode, InsertContext context) { + Integer deviceID = (Integer) context.getValue(); + List deviceIDs = memNode.getDeviceIDS(); + deviceIDs.add(deviceID); + } + }); + + baseLsmManager.process(memTableManager, new InsertContext(1, null, "a", "b")); + baseLsmManager.process(memTableManager, new InsertContext(2, null, "a", "d")); + baseLsmManager.process(memTableManager, new InsertContext(3, null, "a", "e")); + baseLsmManager.process(memTableManager, new InsertContext(4, null, "a", "b")); + baseLsmManager.process(memTableManager, new InsertContext(5, null, "a1", "b")); + baseLsmManager.process(memTableManager, new InsertContext(6, null, "a2", "b")); + baseLsmManager.process(memTableManager, new InsertContext(65535, null, "a", "b")); + baseLsmManager.process(memTableManager, new InsertContext(65536, null, "a", "b")); + baseLsmManager.process(memTableManager, new InsertContext(2, null, "a", "d")); + baseLsmManager.process(memTableManager, new InsertContext(3, null, "a", "e")); + baseLsmManager.process(memTableManager, new InsertContext(4, null, "a", "b")); + baseLsmManager.process(memTableManager, new InsertContext(5, null, "a1", "b")); + baseLsmManager.process(memTableManager, new InsertContext(6, null, "a2", "b")); + System.out.println(memTableManager); + } + + public static void flushExample(MemTableManager memTableManager) throws Exception { + BasicLsmManager flushManager = + new BasicLsmManager(); + + flushManager + .nextLevel( + new FlushLevelProcess() { + @Override + public void flush(MemTableManager memNode, FlushContext context) { + System.out.println("FLUSH: " + memNode + "-->[level:" + context.getLevel() + "]"); + } + + @Override + public List getChildren(MemTableManager memNode, FlushContext context) { + List memTables = new ArrayList<>(); + memTables.addAll(memNode.getImmutables().values()); + if (memNode.getWorking() != null) memTables.add(memNode.getWorking()); + return memTables; + } + }) + .nextLevel( + new FlushLevelProcess() { + @Override + public void flush(MemTable memNode, FlushContext context) { + System.out.println("FLUSH: " + memNode + "-->[level:" + context.getLevel() + "]"); + } + + @Override + public List getChildren(MemTable memNode, FlushContext context) { + List memGroups = new ArrayList<>(); + memGroups.addAll(memNode.getMap().values()); + return memGroups; + } + }) + .nextLevel( + new FlushLevelProcess() { + @Override + public void flush(MemGroup memNode, FlushContext context) { + System.out.println("FLUSH: " + memNode + "-->[level:" + context.getLevel() + "]"); + } + + @Override + public List getChildren(MemGroup memNode, FlushContext context) { + List memChunk = new ArrayList<>(); + memChunk.addAll(memNode.getMap().values()); + return memChunk; + } + }) + .nextLevel( + new FlushLevelProcess() { + @Override + public void flush(MemChunk memNode, FlushContext context) { + System.out.println("FLUSH: " + memNode + "-->[level:" + context.getLevel() + "]"); + } + + @Override + public List getChildren(MemChunk memNode, FlushContext context) { + return new ArrayList<>(); + } + }); + + flushManager.process(memTableManager, new FlushContext()); + } +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/example/MemChunk.java b/lsm/src/main/java/org/apache/iotdb/lsm/example/MemChunk.java new file mode 100644 index 000000000000..c6e97fd52d9d --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/example/MemChunk.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.example; + +import java.util.ArrayList; +import java.util.List; + +// device list +public class MemChunk { + List deviceIDS; + + public MemChunk() { + deviceIDS = new ArrayList<>(); + } + + public List getDeviceIDS() { + return deviceIDS; + } + + public void setDeviceIDS(List deviceIDS) { + this.deviceIDS = deviceIDS; + } + + @Override + public String toString() { + return "MemChunk{" + deviceIDS.toString() + '}'; + } +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/example/MemGroup.java b/lsm/src/main/java/org/apache/iotdb/lsm/example/MemGroup.java new file mode 100644 index 000000000000..69f61d5d451f --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/example/MemGroup.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.example; + +import java.util.HashMap; +import java.util.Map; + +// tagvalue -> memChunk +public class MemGroup { + + Map map; + + public MemGroup() { + map = new HashMap<>(); + } + + public Map getMap() { + return map; + } + + public void setMap(Map map) { + this.map = map; + } + + @Override + public String toString() { + return "MemGroup{" + map.toString() + '}'; + } +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/example/MemTable.java b/lsm/src/main/java/org/apache/iotdb/lsm/example/MemTable.java new file mode 100644 index 000000000000..7cc82f19ecab --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/example/MemTable.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.example; + +import java.util.HashMap; +import java.util.Map; + +// tagkey - > MemGroup +public class MemTable { + + private Map map; + + public MemTable() { + map = new HashMap<>(); + } + + public Map getMap() { + return map; + } + + public void setMap(Map map) { + this.map = map; + } + + @Override + public String toString() { + return "MemTable{" + map.toString() + '}'; + } +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/example/MemTableManager.java b/lsm/src/main/java/org/apache/iotdb/lsm/example/MemTableManager.java new file mode 100644 index 000000000000..d0f50071747c --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/example/MemTableManager.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.example; + +import java.util.HashMap; +import java.util.Map; + +// 管理working memtable , immutable memtables,框架用户自定义 +public class MemTableManager { + + // 可写的memtable + private MemTable working; + + // 只读的memtables + private Map immutables; + + // 记录已插入的最大的deviceid + private int maxDeviceID; + + public MemTableManager() { + working = new MemTable(); + immutables = new HashMap<>(); + maxDeviceID = 0; + } + + public MemTable getWorking() { + return working; + } + + public void setWorking(MemTable working) { + this.working = working; + } + + public Map getImmutables() { + return immutables; + } + + public void setImmutables(Map immutables) { + this.immutables = immutables; + } + + public int getMaxDeviceID() { + return maxDeviceID; + } + + public void setMaxDeviceID(int maxDeviceID) { + this.maxDeviceID = maxDeviceID; + } + + @Override + public String toString() { + return "MemTableManager{" + + "working=" + + working.toString() + + ", immutables=" + + immutables.toString() + + '}'; + } +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/levelProcess/BasicLevelProcess.java b/lsm/src/main/java/org/apache/iotdb/lsm/levelProcess/BasicLevelProcess.java new file mode 100644 index 000000000000..f4ced68ec5a0 --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/levelProcess/BasicLevelProcess.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.levelProcess; + +import org.apache.iotdb.lsm.context.Context; + +import java.util.List; + +public abstract class BasicLevelProcess implements LevelProcess { + LevelProcess next; + + public abstract void handle(I memNode, C context); + + public abstract List getChildren(I memNode, C context); + + @Override + public LevelProcess nextLevel(LevelProcess next) { + this.next = next; + return next; + } + + @Override + public void process(I memNode, C context) { + context.getAccessStrategy().execute(this, memNode, context); + } + + public boolean hasNext() { + return next != null; + } + + public LevelProcess getNext() { + return next; + } +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/levelProcess/DeleteLevelProcess.java b/lsm/src/main/java/org/apache/iotdb/lsm/levelProcess/DeleteLevelProcess.java new file mode 100644 index 000000000000..f4a4c97cd0f5 --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/levelProcess/DeleteLevelProcess.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.levelProcess; + +import org.apache.iotdb.lsm.context.DeleteContext; + +public abstract class DeleteLevelProcess extends BasicLevelProcess { + + public abstract void delete(I memNode, DeleteContext context); + + @Override + public void handle(I memNode, DeleteContext context) { + delete(memNode, context); + } +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/levelProcess/FlushLevelProcess.java b/lsm/src/main/java/org/apache/iotdb/lsm/levelProcess/FlushLevelProcess.java new file mode 100644 index 000000000000..00363c7683b7 --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/levelProcess/FlushLevelProcess.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.levelProcess; + +import org.apache.iotdb.lsm.context.FlushContext; + +public abstract class FlushLevelProcess extends BasicLevelProcess { + + public abstract void flush(I memNode, FlushContext context); + + public void handle(I memNode, FlushContext context) { + flush(memNode, context); + } +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/levelProcess/InsertLevelProcess.java b/lsm/src/main/java/org/apache/iotdb/lsm/levelProcess/InsertLevelProcess.java new file mode 100644 index 000000000000..91f7e24c1f25 --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/levelProcess/InsertLevelProcess.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.levelProcess; + +import org.apache.iotdb.lsm.context.InsertContext; + +public abstract class InsertLevelProcess extends BasicLevelProcess { + + public abstract void insert(I memNode, InsertContext context); + + @Override + public void handle(I memNode, InsertContext context) { + insert(memNode, context); + } +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/levelProcess/LevelProcess.java b/lsm/src/main/java/org/apache/iotdb/lsm/levelProcess/LevelProcess.java new file mode 100644 index 000000000000..aa04c2bf172b --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/levelProcess/LevelProcess.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.levelProcess; + +import org.apache.iotdb.lsm.context.Context; + +public interface LevelProcess { + LevelProcess nextLevel(LevelProcess next); + + void process(I memNode, C context); + + void handle(I memNode, C context); +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/levelProcess/QueryLevelProcess.java b/lsm/src/main/java/org/apache/iotdb/lsm/levelProcess/QueryLevelProcess.java new file mode 100644 index 000000000000..5b3fc0e9ebb3 --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/levelProcess/QueryLevelProcess.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.levelProcess; + +import org.apache.iotdb.lsm.context.QueryContext; + +public abstract class QueryLevelProcess extends BasicLevelProcess { + + public abstract void query(I memNode, QueryContext context); + + @Override + public void handle(I memNode, QueryContext context) { + query(memNode, context); + } +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/manager/BasicLsmManager.java b/lsm/src/main/java/org/apache/iotdb/lsm/manager/BasicLsmManager.java new file mode 100644 index 000000000000..1855f04c0fe4 --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/manager/BasicLsmManager.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.manager; + +import org.apache.iotdb.lsm.context.Context; +import org.apache.iotdb.lsm.levelProcess.LevelProcess; + +public class BasicLsmManager implements LsmManager { + + LevelProcess levelProcess; + + public void preProcess(T root, C context) throws Exception {} + + public void postProcess(T root, C context) throws Exception {} + + @Override + public void process(T root, C context) throws Exception { + preProcess(root, context); + levelProcess.process(root, context); + postProcess(root, context); + } + + @Override + public LevelProcess nextLevel(LevelProcess levelProcess) { + this.levelProcess = levelProcess; + return levelProcess; + } +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/manager/LsmManager.java b/lsm/src/main/java/org/apache/iotdb/lsm/manager/LsmManager.java new file mode 100644 index 000000000000..2940ad60b175 --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/manager/LsmManager.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.manager; + +import org.apache.iotdb.lsm.context.Context; +import org.apache.iotdb.lsm.levelProcess.LevelProcess; + +public interface LsmManager { + + void process(T memNode, C context) throws Exception; + + LevelProcess nextLevel(LevelProcess next); +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/strategy/AccessStrategy.java b/lsm/src/main/java/org/apache/iotdb/lsm/strategy/AccessStrategy.java new file mode 100644 index 000000000000..9ff3f0f0a03e --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/strategy/AccessStrategy.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.strategy; + +import org.apache.iotdb.lsm.context.Context; +import org.apache.iotdb.lsm.levelProcess.BasicLevelProcess; + +// 表示内存节点访问策略(先序,后序) +public interface AccessStrategy { + + /** + * @param levelProcess 保存当前节点和子节点的处理方法 + * @param memNode 当前待处理的节点 + * @param context 上下文信息 + */ + void execute( + BasicLevelProcess levelProcess, I memNode, C context); +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/strategy/BFSAccessStrategy.java b/lsm/src/main/java/org/apache/iotdb/lsm/strategy/BFSAccessStrategy.java new file mode 100644 index 000000000000..cf794b7097e6 --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/strategy/BFSAccessStrategy.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.strategy; + +import org.apache.iotdb.lsm.context.Context; +import org.apache.iotdb.lsm.levelProcess.BasicLevelProcess; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +public class BFSAccessStrategy implements AccessStrategy { + + Queue sameLevelMemNodes; + + @Override + public void execute( + BasicLevelProcess levelProcess, I memNode, C context) { + List children = new ArrayList<>(); + int currentLevel = context.getLevel(); + // 第一个使用bfs策略的节点 + if (sameLevelMemNodes == null) { + sameLevelMemNodes = new LinkedList<>(); + levelProcess.handle(memNode, context); + children = levelProcess.getChildren(memNode, context); + } else { + while (!sameLevelMemNodes.isEmpty()) { + I node = (I) sameLevelMemNodes.poll(); + levelProcess.handle(node, context); + children.addAll(levelProcess.getChildren(node, context)); + } + } + sameLevelMemNodes.addAll(children); + context.setLevel(currentLevel + 1); + if (levelProcess.hasNext() && !sameLevelMemNodes.isEmpty()) { + levelProcess.getNext().process(null, context); + } + } +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/strategy/PostOrderAccessStrategy.java b/lsm/src/main/java/org/apache/iotdb/lsm/strategy/PostOrderAccessStrategy.java new file mode 100644 index 000000000000..307904176370 --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/strategy/PostOrderAccessStrategy.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.strategy; + +import org.apache.iotdb.lsm.context.Context; +import org.apache.iotdb.lsm.levelProcess.BasicLevelProcess; + +import java.util.List; + +public class PostOrderAccessStrategy implements AccessStrategy { + + @Override + public void execute( + BasicLevelProcess levelProcess, I memNode, C context) { + int currentLevel = context.getLevel(); + AccessStrategy accessStrategy = context.getAccessStrategy(); + List children = levelProcess.getChildren(memNode, context); + // 处理子节点 + if (levelProcess.hasNext()) { + context.setLevel(currentLevel + 1); + for (O child : children) { + levelProcess.getNext().process(child, context); + } + } + + context.setLevel(currentLevel); + context.setAccessStrategy(accessStrategy); + // 处理该节点 + levelProcess.handle(memNode, context); + } +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/strategy/PreOrderAccessStrategy.java b/lsm/src/main/java/org/apache/iotdb/lsm/strategy/PreOrderAccessStrategy.java new file mode 100644 index 000000000000..2c0b1b7b03a6 --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/strategy/PreOrderAccessStrategy.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.strategy; + +import org.apache.iotdb.lsm.context.Context; +import org.apache.iotdb.lsm.levelProcess.BasicLevelProcess; + +import java.util.List; + +public class PreOrderAccessStrategy implements AccessStrategy { + + @Override + public void execute( + BasicLevelProcess levelProcess, I memNode, C context) { + int currentLevel = context.getLevel(); + // 处理该节点 + levelProcess.handle(memNode, context); + List children = levelProcess.getChildren(memNode, context); + + // 处理子节点 + if (levelProcess.hasNext()) { + context.setLevel(currentLevel + 1); + for (O child : children) { + levelProcess.getNext().process(child, context); + } + } + context.setLevel(currentLevel); + } +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/strategy/RBFSAccessStrategy.java b/lsm/src/main/java/org/apache/iotdb/lsm/strategy/RBFSAccessStrategy.java new file mode 100644 index 000000000000..62d62295786f --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/strategy/RBFSAccessStrategy.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.strategy; + +import org.apache.iotdb.lsm.context.Context; +import org.apache.iotdb.lsm.levelProcess.BasicLevelProcess; + +import java.util.List; + +public class RBFSAccessStrategy implements AccessStrategy { + @Override + public void execute( + BasicLevelProcess levelProcess, I memNode, C context) { + int currentLevel = context.getLevel(); + if (Integer.MAX_VALUE == context.getLevelUpperBound() && !levelProcess.hasNext()) { + context.setLevelUpperBound(context.getLevel()); + } + // 如果是根节点 + if (currentLevel == 0) { + while (context.getLevelUpperBound() != currentLevel) { + List children = levelProcess.getChildren(memNode, context); + for (O child : children) { + // 处理子节点 + context.setLevel(currentLevel + 1); + levelProcess.getNext().process(child, context); + context.setLevel(currentLevel); + } + // 每次处理完-1 + context.setLevelUpperBound(context.getLevelUpperBound() - 1); + } + // 处理root节点 + levelProcess.handle(memNode, context); + return; + } + + // 已经处理过,直接return + if (currentLevel > context.getLevelUpperBound()) return; + + // 处理子节点 + if (currentLevel == context.getLevelUpperBound()) { + levelProcess.handle(memNode, context); + return; + } + List children = levelProcess.getChildren(memNode, context); + for (O child : children) { + context.setLevel(currentLevel + 1); + levelProcess.getNext().process(child, context); + context.setLevel(currentLevel); + } + } +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/wal/IWALReader.java b/lsm/src/main/java/org/apache/iotdb/lsm/wal/IWALReader.java new file mode 100644 index 000000000000..6d52ef1577fc --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/wal/IWALReader.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.wal; + +import java.io.FileNotFoundException; +import java.io.IOException; + +public interface IWALReader { + + void close() throws IOException; + + boolean hasNext() throws FileNotFoundException; + + WALRecord next() throws FileNotFoundException; +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/wal/IWALWriter.java b/lsm/src/main/java/org/apache/iotdb/lsm/wal/IWALWriter.java new file mode 100644 index 000000000000..6d3fecb21e2e --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/wal/IWALWriter.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.wal; + +import java.io.IOException; + +public interface IWALWriter { + + void write(WALRecord walRecord) throws IOException; + + void force() throws IOException; + + void close() throws IOException; +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/wal/WALReader.java b/lsm/src/main/java/org/apache/iotdb/lsm/wal/WALReader.java new file mode 100644 index 000000000000..a3fef56b0860 --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/wal/WALReader.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.wal; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.NoSuchElementException; + +public class WALReader implements IWALReader { + private static final Logger logger = LoggerFactory.getLogger(WALReader.class); + private final File logFile; + private final WALRecord prototype; + private final DataInputStream logStream; + private WALRecord nextRecord; + private boolean fileCorrupted = false; + + public WALReader(File logFile, WALRecord prototype) throws IOException { + this.logFile = logFile; + this.logStream = + new DataInputStream(new BufferedInputStream(Files.newInputStream(logFile.toPath()))); + this.prototype = prototype; + } + + @Override + public void close() throws IOException { + logStream.close(); + } + + @Override + public boolean hasNext() { + if (nextRecord != null) { + return true; + } + try { + if (fileCorrupted) { + return false; + } + int logSize = logStream.readInt(); + if (logSize <= 0) { + return false; + } + nextRecord = prototype.clone(); + nextRecord.deserialize(logStream); + } catch (EOFException e) { + logger.info(""); + return false; + } catch (IOException e) { + logger.warn(""); + fileCorrupted = true; + return false; + } + return true; + } + + @Override + public WALRecord next() { + if (nextRecord == null) { + throw new NoSuchElementException(); + } + WALRecord walRecord = nextRecord; + nextRecord = null; + return walRecord; + } + + @Override + public String toString() { + return "WALReader{" + "logFile=" + logFile + '}'; + } +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/wal/WALRecord.java b/lsm/src/main/java/org/apache/iotdb/lsm/wal/WALRecord.java new file mode 100644 index 000000000000..4410b6f8a200 --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/wal/WALRecord.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.wal; + +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +public abstract class WALRecord implements Cloneable { + + public abstract void serialize(ByteBuffer buffer); + + public abstract void deserialize(DataInputStream stream) throws IOException; + + @Override + public WALRecord clone() { + try { + return (WALRecord) super.clone(); + } catch (CloneNotSupportedException e) { + throw new AssertionError(); + } + } +} diff --git a/lsm/src/main/java/org/apache/iotdb/lsm/wal/WALWriter.java b/lsm/src/main/java/org/apache/iotdb/lsm/wal/WALWriter.java new file mode 100644 index 000000000000..dccd8a92233d --- /dev/null +++ b/lsm/src/main/java/org/apache/iotdb/lsm/wal/WALWriter.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.wal; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.FileChannel; + +public class WALWriter implements IWALWriter { + private static final Logger logger = LoggerFactory.getLogger(WALWriter.class); + private File logFile; + private FileOutputStream fileOutputStream; + private FileChannel channel; + private final ByteBuffer lengthBuffer = ByteBuffer.allocate(4); + private final ByteBuffer walBuffer = ByteBuffer.allocate(10000); + private final boolean forceEachWrite; + + public WALWriter(File logFile, boolean forceEachWrite) throws FileNotFoundException { + this.logFile = logFile; + this.forceEachWrite = forceEachWrite; + fileOutputStream = new FileOutputStream(logFile, true); + channel = fileOutputStream.getChannel(); + } + + @Override + public void write(WALRecord walRecord) throws IOException { + if (channel == null) { + fileOutputStream = new FileOutputStream(logFile, true); + channel = fileOutputStream.getChannel(); + } + walBuffer.clear(); + walRecord.serialize(walBuffer); + walBuffer.flip(); + int logSize = walBuffer.limit(); + lengthBuffer.clear(); + lengthBuffer.putInt(logSize); + lengthBuffer.flip(); + + try { + channel.write(lengthBuffer); + channel.write(walBuffer); + + if (this.forceEachWrite) { + channel.force(true); + } + } catch (ClosedChannelException ignored) { + logger.warn("someone interrupt current thread, so no need to do write for io safety"); + } + } + + @Override + public void force() throws IOException { + if (channel != null && channel.isOpen()) { + channel.force(true); + } + } + + @Override + public void close() throws IOException { + if (channel != null) { + if (channel.isOpen()) { + channel.force(true); + } + fileOutputStream.close(); + fileOutputStream = null; + channel.close(); + channel = null; + } + } + + @Override + public String toString() { + return "WALLogWriter{" + "logFile=" + logFile + '}'; + } +} diff --git a/pom.xml b/pom.xml index f43d7aa3132d..f5b6c61ec7a9 100644 --- a/pom.xml +++ b/pom.xml @@ -120,6 +120,8 @@ udf-api rewrite-tsfile-tool external-api + lsm + schema-engine-tag diff --git a/schema-engine-tag/pom.xml b/schema-engine-tag/pom.xml new file mode 100644 index 000000000000..c5b8f565525a --- /dev/null +++ b/schema-engine-tag/pom.xml @@ -0,0 +1,44 @@ + + + + + iotdb-parent + org.apache.iotdb + 0.14.0-SNAPSHOT + + 4.0.0 + schema-engine-tag + schema-engine-tag + + + org.roaringbitmap + RoaringBitmap + 0.9.32 + + + org.apache.iotdb + iotdb-lsm + ${project.version} + + + org.apache.iotdb + iotdb-server + ${project.version} + + + diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/MockTagSchemaRegion.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/MockTagSchemaRegion.java new file mode 100644 index 000000000000..184ce789b99c --- /dev/null +++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/MockTagSchemaRegion.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion; + +import org.apache.iotdb.common.rpc.thrift.TSchemaNode; +import org.apache.iotdb.commons.consensus.SchemaRegionId; +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.exception.metadata.PathNotExistException; +import org.apache.iotdb.db.metadata.LocalSchemaProcessor; +import org.apache.iotdb.db.metadata.mnode.IMNode; +import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; +import org.apache.iotdb.db.metadata.path.MeasurementPath; +import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion; +import org.apache.iotdb.db.metadata.template.Template; +import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo; +import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.sys.ActivateTemplateInClusterPlan; +import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan; +import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan; +import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan; +import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; +import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan; +import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan; +import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan; +import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.dataset.ShowDevicesResult; +import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.utils.Pair; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; + +public class MockTagSchemaRegion implements ISchemaRegion { + + @Override + public void init() throws MetadataException {} + + @Override + public void clear() {} + + @Override + public void forceMlog() {} + + @Override + public SchemaRegionId getSchemaRegionId() { + return null; + } + + @Override + public String getStorageGroupFullPath() { + return null; + } + + @Override + public void deleteSchemaRegion() throws MetadataException {} + + @Override + public boolean createSnapshot(File snapshotDir) { + return false; + } + + @Override + public void loadSnapshot(File latestSnapshotRootDir) {} + + @Override + public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException {} + + @Override + public void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException {} + + @Override + public Pair> deleteTimeseries(PartialPath pathPattern, boolean isPrefixMatch) + throws MetadataException { + return null; + } + + @Override + public void autoCreateDeviceMNode(AutoCreateDeviceMNodePlan plan) throws MetadataException {} + + @Override + public boolean isPathExist(PartialPath path) throws MetadataException { + return false; + } + + @Override + public int getAllTimeseriesCount(PartialPath pathPattern, boolean isPrefixMatch) + throws MetadataException { + return 0; + } + + @Override + public int getAllTimeseriesCount( + PartialPath pathPattern, Map templateMap, boolean isPrefixMatch) + throws MetadataException { + return 0; + } + + @Override + public int getAllTimeseriesCount( + PartialPath pathPattern, boolean isPrefixMatch, String key, String value, boolean isContains) + throws MetadataException { + return 0; + } + + @Override + public Map getMeasurementCountGroupByLevel( + PartialPath pathPattern, int level, boolean isPrefixMatch) throws MetadataException { + return null; + } + + @Override + public Map getMeasurementCountGroupByLevel( + PartialPath pathPattern, + int level, + boolean isPrefixMatch, + String key, + String value, + boolean isContains) + throws MetadataException { + return null; + } + + @Override + public int getDevicesNum(PartialPath pathPattern, boolean isPrefixMatch) + throws MetadataException { + return 0; + } + + @Override + public int getNodesCountInGivenLevel(PartialPath pathPattern, int level, boolean isPrefixMatch) + throws MetadataException { + return 0; + } + + @Override + public List getNodesListInGivenLevel( + PartialPath pathPattern, + int nodeLevel, + boolean isPrefixMatch, + LocalSchemaProcessor.StorageGroupFilter filter) + throws MetadataException { + return null; + } + + @Override + public Set getChildNodePathInNextLevel(PartialPath pathPattern) + throws MetadataException { + return null; + } + + @Override + public Set getChildNodeNameInNextLevel(PartialPath pathPattern) throws MetadataException { + return null; + } + + @Override + public Set getBelongedDevices(PartialPath timeseries) throws MetadataException { + return null; + } + + @Override + public Set getMatchedDevices(PartialPath pathPattern, boolean isPrefixMatch) + throws MetadataException { + return null; + } + + @Override + public Pair, Integer> getMatchedDevices(ShowDevicesPlan plan) + throws MetadataException { + return null; + } + + @Override + public List getMeasurementPaths(PartialPath pathPattern, boolean isPrefixMatch) + throws MetadataException { + return null; + } + + @Override + public Pair, Integer> getMeasurementPathsWithAlias( + PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch) + throws MetadataException { + return null; + } + + @Override + public List fetchSchema( + PartialPath pathPattern, Map templateMap) throws MetadataException { + return null; + } + + @Override + public Pair, Integer> showTimeseries( + ShowTimeSeriesPlan plan, QueryContext context) throws MetadataException { + return null; + } + + @Override + public List getAllMeasurementByDevicePath(PartialPath devicePath) + throws PathNotExistException { + return null; + } + + @Override + public IMNode getDeviceNode(PartialPath path) throws MetadataException { + return null; + } + + @Override + public IMeasurementMNode getMeasurementMNode(PartialPath fullPath) throws MetadataException { + return null; + } + + @Override + public void changeAlias(PartialPath path, String alias) throws MetadataException, IOException {} + + @Override + public void upsertTagsAndAttributes( + String alias, + Map tagsMap, + Map attributesMap, + PartialPath fullPath) + throws MetadataException, IOException {} + + @Override + public void addAttributes(Map attributesMap, PartialPath fullPath) + throws MetadataException, IOException {} + + @Override + public void addTags(Map tagsMap, PartialPath fullPath) + throws MetadataException, IOException {} + + @Override + public void dropTagsOrAttributes(Set keySet, PartialPath fullPath) + throws MetadataException, IOException {} + + @Override + public void setTagsOrAttributesValue(Map alterMap, PartialPath fullPath) + throws MetadataException, IOException {} + + @Override + public void renameTagOrAttributeKey(String oldKey, String newKey, PartialPath fullPath) + throws MetadataException, IOException {} + + @Override + public IMNode getSeriesSchemasAndReadLockDevice(InsertPlan plan) + throws MetadataException, IOException { + return null; + } + + @Override + public DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate( + PartialPath devicePath, + String[] measurements, + Function getDataType, + boolean aligned) + throws MetadataException { + return null; + } + + @Override + public Set getPathsSetTemplate(String templateName) throws MetadataException { + return null; + } + + @Override + public Set getPathsUsingTemplate(String templateName) throws MetadataException { + return null; + } + + @Override + public boolean isTemplateAppendable(Template template, List measurements) + throws MetadataException { + return false; + } + + @Override + public void setSchemaTemplate(SetTemplatePlan plan) throws MetadataException {} + + @Override + public void unsetSchemaTemplate(UnsetTemplatePlan plan) throws MetadataException {} + + @Override + public void setUsingSchemaTemplate(ActivateTemplatePlan plan) throws MetadataException {} + + @Override + public void activateSchemaTemplate(ActivateTemplateInClusterPlan plan, Template template) + throws MetadataException {} + + @Override + public List getPathsUsingTemplate(int templateId) throws MetadataException { + return null; + } + + @Override + public IMNode getMNodeForTrigger(PartialPath fullPath) throws MetadataException { + return null; + } + + @Override + public void releaseMNodeAfterDropTrigger(IMNode node) throws MetadataException {} +} diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java new file mode 100644 index 000000000000..2f04d7c095a7 --- /dev/null +++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion; + +public class TagSchemaRegion {} diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/ITagInvertedIndex.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/ITagInvertedIndex.java new file mode 100644 index 000000000000..d5269bbce987 --- /dev/null +++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/ITagInvertedIndex.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex; + +import org.apache.iotdb.lsm.context.DeleteContext; +import org.apache.iotdb.lsm.context.InsertContext; + +import java.util.List; +import java.util.Map; + +public interface ITagInvertedIndex { + + void addTags(InsertContext context); + + void addTags(Map tags, int id); + + void removeTags(DeleteContext context); + + void removeTags(Map tags, int id); + + List getMatchedIDs(Map tags); +} diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/TagInvertedIndex.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/TagInvertedIndex.java new file mode 100644 index 000000000000..9ae8b0ddb2dc --- /dev/null +++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/TagInvertedIndex.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex; + +import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.deletion.DeletionManager; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.insertion.InsertionManager; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemTable; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.query.QueryManager; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.recover.RecoverManager; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.wal.WALManager; +import org.apache.iotdb.lsm.context.DeleteContext; +import org.apache.iotdb.lsm.context.InsertContext; +import org.apache.iotdb.lsm.context.QueryContext; + +import org.roaringbitmap.RoaringBitmap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class TagInvertedIndex implements ITagInvertedIndex { + private static final Logger logger = LoggerFactory.getLogger(TagInvertedIndex.class); + + private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + + private final InsertionManager insertionManager; + + private final DeletionManager deletionManager; + + private final QueryManager queryManager; + + private final WALManager walManager; + + private final RecoverManager recoverManager; + + private final int numOfDeviceIdsInMemTable; + + private final Map immutableMemTables; + + private MemTable workingMemTable; + + private int maxDeviceID; + + public TagInvertedIndex(String schemaDirPath) throws IOException { + walManager = new WALManager(schemaDirPath); + insertionManager = new InsertionManager(walManager); + deletionManager = new DeletionManager(walManager); + recoverManager = new RecoverManager(walManager); + queryManager = new QueryManager(); + workingMemTable = new MemTable(MemTable.WORKING); + immutableMemTables = new HashMap<>(); + numOfDeviceIdsInMemTable = config.getNumOfDeviceIdsInMemTable(); + maxDeviceID = 0; + recover(); + } + + public synchronized void recover() { + recoverManager.recover(this); + } + + @Override + public synchronized void addTags(InsertContext context) { + int id = (int) context.getValue(); + if (!inWorkingMemTable(id)) { + workingMemTable.setStatus(MemTable.IMMUTABLE); + immutableMemTables.put(maxDeviceID / numOfDeviceIdsInMemTable, workingMemTable); + workingMemTable = new MemTable(MemTable.WORKING); + } + MemTable memTable = workingMemTable; + maxDeviceID = id; + try { + insertionManager.process(memTable, context); + } catch (Exception e) { + logger.error(e.getMessage()); + } + } + + @Override + public synchronized void addTags(Map tags, int id) { + if (!inWorkingMemTable(id)) { + workingMemTable.setStatus(MemTable.IMMUTABLE); + immutableMemTables.put(maxDeviceID / numOfDeviceIdsInMemTable, workingMemTable); + workingMemTable = new MemTable(MemTable.WORKING); + } + MemTable memTable = workingMemTable; + maxDeviceID = id; + try { + for (Map.Entry tag : tags.entrySet()) { + addTag(memTable, tag.getKey(), tag.getValue(), id); + } + } catch (Exception e) { + logger.error(e.getMessage()); + } + } + + @Override + public void removeTags(DeleteContext context) { + int id = (int) context.getValue(); + MemTable memTable = null; + if (inWorkingMemTable(id)) { + memTable = workingMemTable; + } else { + memTable = immutableMemTables.get(id / numOfDeviceIdsInMemTable); + } + try { + deletionManager.process(memTable, context); + } catch (Exception e) { + logger.error(e.getMessage()); + } + } + + @Override + public synchronized void removeTags(Map tags, int id) { + List memTables = new ArrayList<>(); + if (inWorkingMemTable(id)) { + memTables.add(workingMemTable); + } else { + memTables.add(immutableMemTables.get(id / numOfDeviceIdsInMemTable)); + } + try { + for (Map.Entry tag : tags.entrySet()) { + removeTag(memTables, tag.getKey(), tag.getValue(), id); + } + } catch (Exception e) { + logger.error(e.getMessage()); + } + } + + @Override + public synchronized List getMatchedIDs(Map tags) { + List memTables = new ArrayList<>(); + memTables.add(workingMemTable); + memTables.addAll(immutableMemTables.values()); + RoaringBitmap roaringBitmap = new RoaringBitmap(); + int i = 0; + try { + for (Map.Entry tag : tags.entrySet()) { + RoaringBitmap rb = getMatchedIDs(memTables, tag.getKey(), tag.getValue()); + if (i == 0) roaringBitmap = rb; + else roaringBitmap = RoaringBitmap.and(roaringBitmap, rb); + i++; + } + } catch (Exception e) { + logger.error(e.getMessage()); + } + return Arrays.stream(roaringBitmap.toArray()).boxed().collect(Collectors.toList()); + } + + @Override + public String toString() { + return "TagInvertedIndex{" + + "numOfDeviceIdsInMemTable=" + + numOfDeviceIdsInMemTable + + ", workingMemTable=" + + workingMemTable + + ", immutableMemTables=" + + immutableMemTables + + ", maxDeviceID=" + + maxDeviceID + + '}'; + } + + private boolean inWorkingMemTable(int id) { + return id / numOfDeviceIdsInMemTable == maxDeviceID / numOfDeviceIdsInMemTable; + } + + private void addTag(MemTable memTable, String tagKey, String tagValue, int id) throws Exception { + InsertContext insertContext = new InsertContext(id, tagKey, tagValue); + insertionManager.process(memTable, insertContext); + } + + private void removeTag(List memTables, String tagKey, String tagValue, int id) + throws Exception { + DeleteContext deleteContext = new DeleteContext(id, tagKey, tagValue); + for (MemTable memTable : memTables) { + deletionManager.process(memTable, deleteContext); + } + } + + private RoaringBitmap getMatchedIDs(List memTables, String tagKey, String tagValue) + throws Exception { + QueryContext queryContext = new QueryContext(tagKey, tagValue); + for (MemTable memTable : memTables) { + queryManager.process(memTable, queryContext); + } + return (RoaringBitmap) queryContext.getResult(); + } + + @TestOnly + public void clear() throws IOException { + walManager.close(); + } +} diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/deletion/DeletionManager.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/deletion/DeletionManager.java new file mode 100644 index 000000000000..211eed0bbe94 --- /dev/null +++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/deletion/DeletionManager.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.deletion; + +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemTable; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.wal.WALManager; +import org.apache.iotdb.lsm.context.DeleteContext; +import org.apache.iotdb.lsm.manager.BasicLsmManager; + +public class DeletionManager extends BasicLsmManager { + + private WALManager walManager; + + public DeletionManager(WALManager walManager) { + this.walManager = walManager; + initLevelProcess(); + } + + @Override + public void preProcess(MemTable root, DeleteContext context) throws Exception { + if (!context.isRecover()) { + walManager.write(context); + } + } + + private void initLevelProcess() { + this.nextLevel(new MemTableDeletion()) + .nextLevel(new MemChunkGroupDeletion()) + .nextLevel(new MemChunkDeletion()); + } +} diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/deletion/MemChunkDeletion.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/deletion/MemChunkDeletion.java new file mode 100644 index 000000000000..c95a617765b6 --- /dev/null +++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/deletion/MemChunkDeletion.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.deletion; + +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemChunk; +import org.apache.iotdb.lsm.context.DeleteContext; +import org.apache.iotdb.lsm.levelProcess.DeleteLevelProcess; + +import java.util.List; + +public class MemChunkDeletion extends DeleteLevelProcess { + @Override + public List getChildren(MemChunk memNode, DeleteContext context) { + return null; + } + + @Override + public void delete(MemChunk memNode, DeleteContext context) { + Integer deviceID = (Integer) context.getValue(); + memNode.remove(deviceID); + } +} diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/deletion/MemChunkGroupDeletion.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/deletion/MemChunkGroupDeletion.java new file mode 100644 index 000000000000..651553fd8603 --- /dev/null +++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/deletion/MemChunkGroupDeletion.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.deletion; + +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemChunk; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemChunkGroup; +import org.apache.iotdb.lsm.context.DeleteContext; +import org.apache.iotdb.lsm.levelProcess.DeleteLevelProcess; + +import java.util.ArrayList; +import java.util.List; + +public class MemChunkGroupDeletion extends DeleteLevelProcess { + @Override + public List getChildren(MemChunkGroup memNode, DeleteContext context) { + List memChunks = new ArrayList<>(); + String tagValue = (String) context.getKey(); + MemChunk child = memNode.get(tagValue); + if (child != null) memChunks.add(child); + return memChunks; + } + + @Override + public void delete(MemChunkGroup memNode, DeleteContext context) { + String tagValue = (String) context.getKey(); + MemChunk child = memNode.get(tagValue); + if (child == null || child.isEmpty()) { + memNode.remove(tagValue); + } + } +} diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/deletion/MemTableDeletion.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/deletion/MemTableDeletion.java new file mode 100644 index 000000000000..0a1e5b3d0571 --- /dev/null +++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/deletion/MemTableDeletion.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.deletion; + +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemChunkGroup; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemTable; +import org.apache.iotdb.lsm.context.DeleteContext; +import org.apache.iotdb.lsm.levelProcess.DeleteLevelProcess; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +public class MemTableDeletion extends DeleteLevelProcess { + @Override + public List getChildren(MemTable memNode, DeleteContext context) { + if (memNode.isImmutable()) return new ArrayList<>(); + List memChunkGroups = new ArrayList<>(); + String tagKey = (String) context.getKey(); + MemChunkGroup child = memNode.get(tagKey); + if (child != null) memChunkGroups.add(child); + return memChunkGroups; + } + + @Override + public void delete(MemTable memNode, DeleteContext context) { + if (memNode.isImmutable()) { + Set deletionList = memNode.getDeletionList(); + if (!deletionList.contains(context.getValue())) { + deletionList.add((Integer) context.getValue()); + } + return; + } + String tagKey = (String) context.getKey(); + MemChunkGroup child = memNode.get(tagKey); + if (child == null || child.isEmpty()) { + memNode.remove(tagKey); + } + } +} diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/insertion/InsertionManager.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/insertion/InsertionManager.java new file mode 100644 index 000000000000..0e203a3d646b --- /dev/null +++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/insertion/InsertionManager.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.insertion; + +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemTable; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.wal.WALManager; +import org.apache.iotdb.lsm.context.InsertContext; +import org.apache.iotdb.lsm.manager.BasicLsmManager; + +import java.io.IOException; + +public class InsertionManager extends BasicLsmManager { + + private WALManager walManager; + + public InsertionManager(WALManager walManager) { + this.walManager = walManager; + initLevelProcess(); + } + + @Override + public void preProcess(MemTable root, InsertContext context) throws IOException { + if (!context.isRecover()) { + walManager.write(context); + } + } + + private void initLevelProcess() { + this.nextLevel(new MemTableInsertion()) + .nextLevel(new MemChunkGroupInsertion()) + .nextLevel(new MemChunkInsertion()); + } +} diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/insertion/MemChunkGroupInsertion.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/insertion/MemChunkGroupInsertion.java new file mode 100644 index 000000000000..843084223f4e --- /dev/null +++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/insertion/MemChunkGroupInsertion.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.insertion; + +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemChunk; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemChunkGroup; +import org.apache.iotdb.lsm.context.InsertContext; +import org.apache.iotdb.lsm.levelProcess.InsertLevelProcess; + +import java.util.ArrayList; +import java.util.List; + +public class MemChunkGroupInsertion extends InsertLevelProcess { + @Override + public List getChildren(MemChunkGroup memNode, InsertContext context) { + List memChunks = new ArrayList<>(); + String tagValue = (String) context.getKey(); + MemChunk child = memNode.get(tagValue); + if (child != null) memChunks.add(child); + return memChunks; + } + + @Override + public void insert(MemChunkGroup memNode, InsertContext context) { + String tagValue = (String) context.getKey(); + memNode.put(tagValue); + } +} diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/insertion/MemChunkInsertion.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/insertion/MemChunkInsertion.java new file mode 100644 index 000000000000..5dec23dcb7d1 --- /dev/null +++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/insertion/MemChunkInsertion.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.insertion; + +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemChunk; +import org.apache.iotdb.lsm.context.InsertContext; +import org.apache.iotdb.lsm.levelProcess.InsertLevelProcess; + +import java.util.List; + +public class MemChunkInsertion extends InsertLevelProcess { + @Override + public List getChildren(MemChunk memNode, InsertContext context) { + return null; + } + + @Override + public void insert(MemChunk memNode, InsertContext context) { + Integer deviceID = (Integer) context.getValue(); + memNode.put(deviceID); + } +} diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/insertion/MemTableInsertion.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/insertion/MemTableInsertion.java new file mode 100644 index 000000000000..ef88f0591bc0 --- /dev/null +++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/insertion/MemTableInsertion.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.insertion; + +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemChunkGroup; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemTable; +import org.apache.iotdb.lsm.context.InsertContext; +import org.apache.iotdb.lsm.levelProcess.InsertLevelProcess; + +import java.util.ArrayList; +import java.util.List; + +// memtable的insert操作 +public class MemTableInsertion extends InsertLevelProcess { + + @Override + public List getChildren(MemTable memNode, InsertContext context) { + if (memNode.isImmutable()) return new ArrayList<>(); + List memChunkGroups = new ArrayList<>(); + String tagKey = (String) context.getKey(); + MemChunkGroup child = memNode.get(tagKey); + if (child != null) memChunkGroups.add(child); + return memChunkGroups; + } + + @Override + public void insert(MemTable memNode, InsertContext context) { + if (memNode.isImmutable()) return; + String tagKey = (String) context.getKey(); + memNode.put(tagKey); + } +} diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/memtable/MemChunk.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/memtable/MemChunk.java new file mode 100644 index 000000000000..b46a770ca97e --- /dev/null +++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/memtable/MemChunk.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable; + +import org.roaringbitmap.RoaringBitmap; + +// 管理设备id集合 +public class MemChunk { + private RoaringBitmap roaringBitmap; + + public MemChunk() { + roaringBitmap = new RoaringBitmap(); + } + + public boolean isEmpty() { + if (roaringBitmap == null) return true; + return roaringBitmap.isEmpty(); + } + + @Override + public String toString() { + return roaringBitmap.toString(); + } + + public void put(int id) { + roaringBitmap.add(id); + } + + public void remove(int id) { + roaringBitmap.remove(id); + } + + public RoaringBitmap getRoaringBitmap() { + return this.roaringBitmap; + } +} diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/memtable/MemChunkGroup.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/memtable/MemChunkGroup.java new file mode 100644 index 000000000000..f6765030183a --- /dev/null +++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/memtable/MemChunkGroup.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable; + +import java.util.HashMap; +import java.util.Map; + +public class MemChunkGroup { + private Map memChunkMap; + + public MemChunkGroup() { + memChunkMap = new HashMap<>(); + } + + public void put(String tagValue) { + if (!memChunkMap.containsKey(tagValue)) { + memChunkMap.put(tagValue, new MemChunk()); + } + } + + @Override + public String toString() { + return memChunkMap.toString(); + } + + public MemChunk get(String tagValue) { + return memChunkMap.get(tagValue); + } + + public void remove(String tagValue) { + memChunkMap.remove(tagValue); + } + + public boolean isEmpty() { + return memChunkMap.isEmpty(); + } +} diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/memtable/MemTable.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/memtable/MemTable.java new file mode 100644 index 000000000000..f0730e162df1 --- /dev/null +++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/memtable/MemTable.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class MemTable { + public static final String WORKING = "working"; + + public static final String IMMUTABLE = "immutable"; + + private Map memChunkGroupMap; + + private String status; + + private Set deletionList; + + public MemTable(String status) { + memChunkGroupMap = new HashMap<>(); + this.status = status; + deletionList = new HashSet<>(); + } + + public void put(String tagKey) { + if (this.status.equals(IMMUTABLE)) return; + if (!memChunkGroupMap.containsKey(tagKey)) { + memChunkGroupMap.put(tagKey, new MemChunkGroup()); + } + } + + @Override + public String toString() { + return "MemTable{" + + "memChunkGroupMap=" + + memChunkGroupMap + + ", status='" + + status + + '\'' + + ", deletionList=" + + deletionList + + '}'; + } + + public MemChunkGroup get(String tagKey) { + return memChunkGroupMap.get(tagKey); + } + + public void remove(String tagKey) { + memChunkGroupMap.remove(tagKey); + } + + public boolean isImmutable() { + return status.equals(IMMUTABLE); + } + + public Set getDeletionList() { + return deletionList; + } + + public void setStatus(String status) { + this.status = status; + } +} diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/query/MemChunkGroupQuery.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/query/MemChunkGroupQuery.java new file mode 100644 index 000000000000..bf9656e82205 --- /dev/null +++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/query/MemChunkGroupQuery.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.query; + +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemChunk; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemChunkGroup; +import org.apache.iotdb.lsm.context.QueryContext; +import org.apache.iotdb.lsm.levelProcess.QueryLevelProcess; + +import java.util.ArrayList; +import java.util.List; + +public class MemChunkGroupQuery extends QueryLevelProcess { + @Override + public List getChildren(MemChunkGroup memNode, QueryContext context) { + List memChunks = new ArrayList<>(); + String tagValue = (String) context.getKey(); + MemChunk child = memNode.get(tagValue); + if (child != null) memChunks.add(child); + return memChunks; + } + + @Override + public void query(MemChunkGroup memNode, QueryContext context) {} +} diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/query/MemChunkQuery.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/query/MemChunkQuery.java new file mode 100644 index 000000000000..d67de88f62bb --- /dev/null +++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/query/MemChunkQuery.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.query; + +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemChunk; +import org.apache.iotdb.lsm.context.QueryContext; +import org.apache.iotdb.lsm.levelProcess.QueryLevelProcess; + +import org.roaringbitmap.RoaringBitmap; + +import java.util.List; + +public class MemChunkQuery extends QueryLevelProcess { + @Override + public List getChildren(MemChunk memNode, QueryContext context) { + return null; + } + + @Override + public void query(MemChunk memNode, QueryContext context) { + RoaringBitmap roaringBitmap = (RoaringBitmap) context.getResult(); + if (roaringBitmap == null) roaringBitmap = new RoaringBitmap(); + RoaringBitmap now = RoaringBitmap.or(roaringBitmap, memNode.getRoaringBitmap()); + context.setResult(now); + } +} diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/query/MemTableQuery.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/query/MemTableQuery.java new file mode 100644 index 000000000000..b1acbf845b14 --- /dev/null +++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/query/MemTableQuery.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.query; + +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemChunkGroup; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemTable; +import org.apache.iotdb.lsm.context.QueryContext; +import org.apache.iotdb.lsm.levelProcess.QueryLevelProcess; + +import org.roaringbitmap.RoaringBitmap; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +public class MemTableQuery extends QueryLevelProcess { + + @Override + public List getChildren(MemTable memNode, QueryContext context) { + List memChunkGroups = new ArrayList<>(); + String tagKey = (String) context.getKey(); + MemChunkGroup child = memNode.get(tagKey); + if (child != null) memChunkGroups.add(child); + return memChunkGroups; + } + + @Override + public void query(MemTable memNode, QueryContext context) { + // 如果是immutable,则需要在查询结果中删除deletionList中的id + if (memNode.isImmutable()) { + RoaringBitmap roaringBitmap = (RoaringBitmap) context.getResult(); + Set deletionList = memNode.getDeletionList(); + for (Integer id : deletionList) { + roaringBitmap.remove(id); + } + } + } +} diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/query/QueryManager.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/query/QueryManager.java new file mode 100644 index 000000000000..4a8e3fdbcb0e --- /dev/null +++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/query/QueryManager.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.query; + +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemTable; +import org.apache.iotdb.lsm.context.QueryContext; +import org.apache.iotdb.lsm.manager.BasicLsmManager; + +public class QueryManager extends BasicLsmManager { + + public QueryManager() { + initLevelProcess(); + } + + private void initLevelProcess() { + this.nextLevel(new MemTableQuery()) + .nextLevel(new MemChunkGroupQuery()) + .nextLevel(new MemChunkQuery()); + } +} diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/recover/RecoverManager.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/recover/RecoverManager.java new file mode 100644 index 000000000000..ec9fd7472956 --- /dev/null +++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/recover/RecoverManager.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.recover; + +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.ITagInvertedIndex; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.wal.WALManager; +import org.apache.iotdb.lsm.context.Context; +import org.apache.iotdb.lsm.context.DeleteContext; +import org.apache.iotdb.lsm.context.InsertContext; + +public class RecoverManager { + + private WALManager walManager; + + public RecoverManager(WALManager walManager) { + this.walManager = walManager; + } + + public void recover(ITagInvertedIndex tagInvertedIndex) { + while (true) { + Context context = walManager.read(); + switch (context.getType()) { + case INSERT: + tagInvertedIndex.addTags((InsertContext) context); + break; + case DELETE: + tagInvertedIndex.removeTags((DeleteContext) context); + break; + default: + return; + } + } + } +} diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/wal/WALEntry.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/wal/WALEntry.java new file mode 100644 index 000000000000..6cf24548492a --- /dev/null +++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/wal/WALEntry.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.wal; + +import org.apache.iotdb.lsm.wal.WALRecord; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class WALEntry extends WALRecord { + private int type; + + private List keys; + + private int deviceID; + + public WALEntry() {super();} + + public WALEntry(int type, List keys, int deviceID) { + super(); + this.type = type; + this.keys = keys; + this.deviceID = deviceID; + } + + @Override + public void serialize(ByteBuffer buffer) { + ReadWriteIOUtils.write(type, buffer); + ReadWriteIOUtils.write(deviceID, buffer); + ReadWriteIOUtils.write(keys.size(), buffer); + for (String key : keys) { + ReadWriteIOUtils.write(key, buffer); + } + } + + @Override + public void deserialize(DataInputStream stream) throws IOException { + this.type = stream.readInt(); + this.deviceID = stream.readInt(); + int length = stream.readInt(); + this.keys = new ArrayList<>(); + for (int i = 0; i < length; i++) { + String key = ReadWriteIOUtils.readString(stream); + keys.add(key); + } + } + + public int getType() { + return type; + } + + public void setType(int type) { + this.type = type; + } + + public List getKeys() { + return keys; + } + + public void setKeys(List keys) { + this.keys = keys; + } + + public int getDeviceID() { + return deviceID; + } + + public void setDeviceID(int deviceID) { + this.deviceID = deviceID; + } + + @Override + public String toString() { + return "WALEntry{" + "type=" + type + ", keys=" + keys + ", deviceID=" + deviceID + '}'; + } +} diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/wal/WALManager.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/wal/WALManager.java new file mode 100644 index 000000000000..b0ac296f6c15 --- /dev/null +++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/wal/WALManager.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.wal; + +import org.apache.iotdb.lsm.context.Context; +import org.apache.iotdb.lsm.context.DeleteContext; +import org.apache.iotdb.lsm.context.InsertContext; +import org.apache.iotdb.lsm.wal.WALReader; +import org.apache.iotdb.lsm.wal.WALWriter; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class WALManager { + private static final String WAL_FILE_NAME = "tagInvertedIndex.log"; + private static final int INSERT = 1; + private static final int DELETE = 2; + private final String schemaDirPath; + private File walFile; + private WALWriter walWriter; + private WALReader walReader; + + public WALManager(String schemaDirPath) throws IOException { + this.schemaDirPath = schemaDirPath; + initFile(schemaDirPath); + walWriter = new WALWriter(walFile, false); + walReader = new WALReader(walFile, new WALEntry()); + } + + private void initFile(String schemaDirPath) throws IOException { + File schemaDir = new File(schemaDirPath); + schemaDir.mkdirs(); + walFile = new File(this.schemaDirPath, WAL_FILE_NAME); + if (!walFile.exists()) { + walFile.createNewFile(); + } + } + + public synchronized void write(Context context) throws IOException { + switch (context.getType()) { + case INSERT: + process((InsertContext) context); + break; + case DELETE: + process((DeleteContext) context); + break; + default: + break; + } + } + + // 用于recover + public synchronized Context read() { + if (walReader.hasNext()) { + WALEntry walEntry = (WALEntry) walReader.next(); + if (walEntry.getType() == INSERT) { + return generateInsertContext(walEntry); + } + if (walEntry.getType() == DELETE) { + return generateDeleteContext(walEntry); + } + } + return new Context(); + } + + private InsertContext generateInsertContext(WALEntry walEntry) { + InsertContext insertContext = new InsertContext(); + List objects = new ArrayList<>(); + objects.addAll(walEntry.getKeys()); + insertContext.setKeys(objects); + insertContext.setValue(walEntry.getDeviceID()); + insertContext.setRecover(true); + return insertContext; + } + + private DeleteContext generateDeleteContext(WALEntry walEntry) { + DeleteContext deleteContext = new DeleteContext(walEntry.getDeviceID(), walEntry.getKeys()); + List objects = new ArrayList<>(); + objects.addAll(walEntry.getKeys()); + deleteContext.setKeys(objects); + deleteContext.setValue(walEntry.getDeviceID()); + deleteContext.setRecover(true); + return deleteContext; + } + + private void process(InsertContext insertContext) throws IOException { + List objects = insertContext.getKeys(); + List keys = new ArrayList<>(); + for (Object o : objects) { + keys.add((String) o); + } + WALEntry walEntry = new WALEntry(INSERT, keys, (Integer) insertContext.getValue()); + walWriter.write(walEntry); + } + + private void process(DeleteContext deleteContext) throws IOException { + List objects = deleteContext.getKeys(); + List keys = new ArrayList<>(); + for (Object o : objects) { + keys.add((String) o); + } + WALEntry walEntry = new WALEntry(DELETE, keys, (Integer) deleteContext.getValue()); + walWriter.write(walEntry); + } + + public void close() throws IOException { + walWriter.close(); + walReader.close(); + } +} diff --git a/schema-engine-tag/src/test/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/TagInvertedIndexTest.java b/schema-engine-tag/src/test/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/TagInvertedIndexTest.java new file mode 100644 index 000000000000..f193862a6859 --- /dev/null +++ b/schema-engine-tag/src/test/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/TagInvertedIndexTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex; + +import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.tsfile.utils.Pair; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class TagInvertedIndexTest { + private String[][] record = + new String[][] { + {"tag1=q", "tag2=a", "1"}, + {"tag1=q", "tag2=s", "2"}, + {"tag1=q", "tag2=a", "tag3=z", "3"}, + {"tag1=q", "tag3=v", "4"}, + {"tag1=q", "tag2=s", "5"}, + {"tag1=w", "tag2=d", "6"}, + {"tag1=q", "tag2=d", "tag3=e", "7"}, + {"tag1=t", "tag2=g", "8"}, + {"tag1=r", "tag2=d", "9"}, + {"tag1=t", "tag2=f", "10"}, + {"tag1=t", "tag2=h", "11"}, + {"tag1=q", "tag2=a", "tag3=l", "12"}, + {"tag1=y", "tag2=j", "13"}, + {"tag1=u", "tag2=k", "14"}, + {"tag1=q", "tag2=a", "tag3=x", "15"}, + {"tag1=q", "tag2=a", "tag4=z", "16"}, + {"tag1=y", "tag2=a", "tag4=z", "17"}, + {"tag1=q", "tag2=b", "tag3=x", "18"}, + }; + + private int numOfDeviceIdsInMemTable; + + private TagInvertedIndex tagInvertedIndex; + + private String storageGroupDirPath; + + private String schemaRegionDirPath; + + private String storageGroupFullPath = "root/testTagIndex"; + + private String schemaDir; + + @Before + public void setUp() throws Exception { + numOfDeviceIdsInMemTable = + IoTDBDescriptor.getInstance().getConfig().getNumOfDeviceIdsInMemTable(); + IoTDBDescriptor.getInstance().getConfig().setNumOfDeviceIdsInMemTable(3); + schemaDir = IoTDBDescriptor.getInstance().getConfig().getSchemaDir(); + storageGroupDirPath = schemaDir + File.separator + storageGroupFullPath; + schemaRegionDirPath = storageGroupDirPath + File.separator + 0; + tagInvertedIndex = new TagInvertedIndex(schemaRegionDirPath); + } + + @After + public void tearDown() throws Exception { + IoTDBDescriptor.getInstance().getConfig().setNumOfDeviceIdsInMemTable(numOfDeviceIdsInMemTable); + tagInvertedIndex.clear(); + tagInvertedIndex = null; + FileUtils.deleteDirectoryAndEmptyParent(new File(schemaDir)); + } + + public void addTags() { + List, Integer>> records = generateTags(); + for (Pair, Integer> pair : records) { + tagInvertedIndex.addTags(pair.left, pair.right); + } + } + + public void removeTags() { + Pair, Integer> tags = generateTag(record[0]); + tagInvertedIndex.removeTags(tags.left, tags.right); + tags = generateTag(record[1]); + tagInvertedIndex.removeTags(tags.left, tags.right); + tags = generateTag(record[3]); + tagInvertedIndex.removeTags(tags.left, tags.right); + tags = generateTag(record[11]); + tagInvertedIndex.removeTags(tags.left, tags.right); + } + + @Test + public void getMatchedIDs() { + addTags(); + Map tags1 = new HashMap<>(); + tags1.put("tag1", "q"); + + Map tags2 = new HashMap<>(); + tags2.put("tag1", "q"); + tags2.put("tag2", "a"); + + List ids = tagInvertedIndex.getMatchedIDs(tags1); + List verify = Arrays.asList(1, 2, 3, 4, 5, 7, 12, 15, 16, 18); + assertEquals(verify, ids); + + ids = tagInvertedIndex.getMatchedIDs(tags2); + verify = Arrays.asList(1, 3, 12, 15, 16); + assertEquals(verify, ids); + + removeTags(); + + ids = tagInvertedIndex.getMatchedIDs(tags1); + verify = Arrays.asList(3, 5, 7, 15, 16, 18); + assertEquals(verify, ids); + + ids = tagInvertedIndex.getMatchedIDs(tags2); + verify = Arrays.asList(3, 15, 16); + assertEquals(verify, ids); + } + + @Test + public void testRecover() throws IOException { + Map tags1 = new HashMap<>(); + tags1.put("tag1", "q"); + + Map tags2 = new HashMap<>(); + tags2.put("tag1", "q"); + tags2.put("tag2", "a"); + addTags(); + removeTags(); + + tagInvertedIndex.clear(); + tagInvertedIndex = new TagInvertedIndex(schemaRegionDirPath); + + List ids = tagInvertedIndex.getMatchedIDs(tags1); + List verify = Arrays.asList(3, 5, 7, 15, 16, 18); + assertEquals(verify, ids); + + ids = tagInvertedIndex.getMatchedIDs(tags2); + verify = Arrays.asList(3, 15, 16); + assertEquals(verify, ids); + } + + private List, Integer>> generateTags() { + List, Integer>> pairs = new ArrayList<>(); + for (String[] strings : record) { + pairs.add(generateTag(strings)); + } + return pairs; + } + + private Pair, Integer> generateTag(String[] strings) { + Map tags = new HashMap<>(); + int i = 0; + for (; i < strings.length - 1; i++) { + String[] str = strings[i].split("="); + tags.put(str[0], str[1]); + } + Pair, Integer> pair = new Pair<>(tags, Integer.valueOf(strings[i])); + return pair; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 750d10ec3d78..495a7bb7bcac 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -839,6 +839,8 @@ public class IoTDBConfig { */ private boolean enableIDTableLogFile = false; + private int numOfDeviceIdsInMemTable = 65536; + /** whether to use persistent schema mode */ private String schemaEngineMode = "Memory"; @@ -2751,6 +2753,14 @@ public void setEnableIDTableLogFile(boolean enableIDTableLogFile) { this.enableIDTableLogFile = enableIDTableLogFile; } + public int getNumOfDeviceIdsInMemTable() { + return numOfDeviceIdsInMemTable; + } + + public void setNumOfDeviceIdsInMemTable(int numOfDeviceIdsInMemTable) { + this.numOfDeviceIdsInMemTable = numOfDeviceIdsInMemTable; + } + public String getSchemaEngineMode() { return schemaEngineMode; }