diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeMemoryConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeMemoryConfig.java index efb6758066f1..d07bcc61d74b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeMemoryConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeMemoryConfig.java @@ -56,6 +56,12 @@ public class DataNodeMemoryConfig { /** whether to cache metadata(ChunkMetaData and TsFileMetaData) or not. */ private boolean metaDataCacheEnable = true; + /** + * If a timeseries is not found in a TsFile, also cache a placeholder to indicate the + * non-existence. + */ + private boolean mayCacheNonExistSeries = true; + /** How many threads can concurrently execute query statement. When <= 0, use CPU core number. */ private int queryThreadCount = Runtime.getRuntime().availableProcessors(); @@ -407,6 +413,10 @@ private void initQueryEngineMemoryAllocate( Boolean.parseBoolean( properties.getProperty( "meta_data_cache_enable", Boolean.toString(isMetaDataCacheEnable())))); + setMayCacheNonExistSeries( + Boolean.parseBoolean( + properties.getProperty( + "may_cache_nonexist_series", Boolean.toString(isMetaDataCacheEnable())))); setQueryThreadCount( Integer.parseInt( properties.getProperty("query_thread_count", Integer.toString(getQueryThreadCount())))); @@ -560,6 +570,14 @@ public void setMetaDataCacheEnable(boolean metaDataCacheEnable) { this.metaDataCacheEnable = metaDataCacheEnable; } + public boolean isMayCacheNonExistSeries() { + return mayCacheNonExistSeries; + } + + public void setMayCacheNonExistSeries(boolean mayCacheNonExistSeries) { + this.mayCacheNonExistSeries = mayCacheNonExistSeries; + } + public int getQueryThreadCount() { return queryThreadCount; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java index c3fef67b5f1f..4fb532b32d65 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java @@ -35,10 +35,13 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Weigher; import org.apache.tsfile.common.constant.TsFileConstant; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.tsfile.file.metadata.statistics.Statistics; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.utils.BloomFilter; +import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.RamUsageEstimator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,8 +72,16 @@ public class TimeSeriesMetadataCache { IoTDBDescriptor.getInstance().getMemoryConfig(); private static final IMemoryBlock CACHE_MEMORY_BLOCK; private static final boolean CACHE_ENABLE = memoryConfig.isMetaDataCacheEnable(); + private static final TimeseriesMetadata NULL_EXISTS_CACHE_PLACE_HOLDER = + new TimeseriesMetadata( + (byte) 0, + 0, + "", + TSDataType.INT32, + Statistics.getStatsByType(TSDataType.INT32), + new PublicBAOS()); - private final Cache lruCache; + private Cache lruCache; private final AtomicLong entryAverageSize = new AtomicLong(0); @@ -78,6 +89,9 @@ public class TimeSeriesMetadataCache { Collections.synchronizedMap(new WeakHashMap<>()); private static final String SEPARATOR = "$"; + private final AtomicLong evictedExistingEntryCount = new AtomicLong(0); + private final AtomicLong evictedNonExistingEntryCount = new AtomicLong(0); + static { CACHE_MEMORY_BLOCK = memoryConfig @@ -98,8 +112,20 @@ private TimeSeriesMetadataCache() { .weigher( (Weigher) (key, value) -> - (int) (key.getRetainedSizeInBytes() + value.getRetainedSizeInBytes())) + (int) + (key.getRetainedSizeInBytes() + + (value == NULL_EXISTS_CACHE_PLACE_HOLDER + ? 0 + : value.getRetainedSizeInBytes()))) .recordStats() + .evictionListener( + (k, v, c) -> { + if (v == NULL_EXISTS_CACHE_PLACE_HOLDER) { + evictedNonExistingEntryCount.incrementAndGet(); + } else { + evictedExistingEntryCount.incrementAndGet(); + } + }) .build(); // add metrics MetricService.getInstance().addMetricSet(new TimeSeriesMetadataCacheMetrics(this)); @@ -217,10 +243,13 @@ public TimeseriesMetadata get( } } } - if (timeseriesMetadata == null) { + if (timeseriesMetadata == null || timeseriesMetadata == NULL_EXISTS_CACHE_PLACE_HOLDER) { if (debug) { DEBUG_LOGGER.info("The file doesn't have this time series {}.", key); } + if (timeseriesMetadata == null && memoryConfig.isMayCacheNonExistSeries()) { + lruCache.put(key, NULL_EXISTS_CACHE_PLACE_HOLDER); + } return null; } else { if (debug) { @@ -283,8 +312,35 @@ public double calculateBloomFilterHitRatio() { /** clear LRUCache. */ public void clear() { - lruCache.invalidateAll(); - lruCache.cleanUp(); + logger.info( + "Evicted non-existing/existing series count: {}/{}({}), total request: {}", + evictedNonExistingEntryCount.get(), + evictedExistingEntryCount.get(), + ((double) evictedNonExistingEntryCount.get()) / evictedExistingEntryCount.get(), + lruCache.stats().requestCount()); + lruCache = + Caffeine.newBuilder() + .maximumWeight(CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes()) + .weigher( + (Weigher) + (key, value) -> + (int) + (key.getRetainedSizeInBytes() + + (value == NULL_EXISTS_CACHE_PLACE_HOLDER + ? 0 + : value.getRetainedSizeInBytes()))) + .recordStats() + .evictionListener( + (k, v, c) -> { + if (v == NULL_EXISTS_CACHE_PLACE_HOLDER) { + evictedNonExistingEntryCount.incrementAndGet(); + } else { + evictedExistingEntryCount.incrementAndGet(); + } + }) + .build(); + evictedNonExistingEntryCount.set(0); + evictedExistingEntryCount.set(0); } public void remove(TimeSeriesMetadataCacheKey key) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCacheTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCacheTest.java new file mode 100644 index 000000000000..f8a67ed5b529 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCacheTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.buffer; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; +import org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache.TimeSeriesMetadataCacheKey; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.IDeviceID.Factory; +import org.apache.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.tsfile.write.TsFileWriter; +import org.apache.tsfile.write.record.TSRecord; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +public class TimeSeriesMetadataCacheTest { + + private void testCachePlaceHolderInternal() + throws IOException, WriteProcessException, ExecutionException, InterruptedException { + File file = new File("target/test.tsfile"); + TsFileID tsFileID = new TsFileID(); + + int deviceCnt = 2000; + int seriesPerDevice = 2000; + double nonExistSeriesRatio = 1.0; + int concurrency = 10; + List deviceIDList = new ArrayList<>(); + for (int i = 0; i < deviceCnt; i++) { + deviceIDList.add(Factory.DEFAULT_FACTORY.create("root.d" + i)); + } + ExecutorService executor = Executors.newFixedThreadPool(concurrency); + List> futures = new ArrayList<>(); + + try (TsFileWriter tsFileWriter = new TsFileWriter(file)) { + // 100*100 series in the file + + for (int i = 0; i < deviceCnt; i++) { + for (int j = 0; j < seriesPerDevice; j++) { + tsFileWriter.registerTimeseries( + deviceIDList.get(i), new MeasurementSchema("s" + j, TSDataType.INT32)); + } + } + for (int i = 0; i < deviceCnt; i++) { + TSRecord rec = new TSRecord(deviceIDList.get(i), 0); + for (int j = 0; j < seriesPerDevice; j++) { + rec.addPoint("s" + j, 0); + } + tsFileWriter.writeRecord(rec); + } + tsFileWriter.close(); + + // read 100*200 series each 10 times in the file + long start = System.currentTimeMillis(); + QueryContext queryContext = new QueryContext(); + // put k in outer loop + int devicePerThread = deviceCnt / concurrency; + for (int c = 0; c < concurrency; c++) { + final int finalC = c; + QueryContext finalQueryContext = queryContext; + futures.add( + executor.submit( + () -> { + for (int k = 0; k < 10; k++) { + for (int i = devicePerThread * finalC; + i < devicePerThread * (finalC + 1); + i++) { + for (int j = 0; + j < seriesPerDevice + (int) (seriesPerDevice * nonExistSeriesRatio); + j++) { + TimeSeriesMetadataCacheKey key = + new TimeSeriesMetadataCacheKey(tsFileID, deviceIDList.get(i), "s" + j); + TimeseriesMetadata timeseriesMetadata = + TimeSeriesMetadataCache.getInstance() + .get( + file.getPath(), + key, + Collections.singleton("s" + j), + true, + false, + finalQueryContext); + if (j < seriesPerDevice) { + assertNotNull(timeseriesMetadata); + assertEquals("s" + j, timeseriesMetadata.getMeasurementId()); + } else { + assertNull(timeseriesMetadata); + } + } + } + } + return null; + })); + } + for (Future future : futures) { + future.get(); + } + futures.clear(); + System.out.println("time cost with outer k: " + (System.currentTimeMillis() - start) + "ms"); + TimeSeriesMetadataCache.getInstance().clear(); + + start = System.currentTimeMillis(); + queryContext = new QueryContext(); + // put k in inner loop + for (int c = 0; c < concurrency; c++) { + final int finalC = c; + QueryContext finalQueryContext = queryContext; + futures.add( + executor.submit( + () -> { + for (int i = devicePerThread * finalC; i < devicePerThread * (finalC + 1); i++) { + for (int j = 0; + j < seriesPerDevice + (int) (seriesPerDevice * nonExistSeriesRatio); + j++) { + TimeSeriesMetadataCacheKey key = + new TimeSeriesMetadataCacheKey(tsFileID, deviceIDList.get(i), "s" + j); + for (int k = 0; k < 10; k++) { + TimeseriesMetadata timeseriesMetadata = + TimeSeriesMetadataCache.getInstance() + .get( + file.getPath(), + key, + Collections.singleton("s" + j), + true, + false, + finalQueryContext); + if (j < seriesPerDevice) { + assertNotNull(timeseriesMetadata); + assertEquals("s" + j, timeseriesMetadata.getMeasurementId()); + } else { + assertNull(timeseriesMetadata); + } + } + } + } + return null; + })); + } + for (Future future : futures) { + future.get(); + } + futures.clear(); + + System.out.println("time cost with inner k: " + (System.currentTimeMillis() - start) + "ms"); + TimeSeriesMetadataCache.getInstance().clear(); + } finally { + file.delete(); + } + } + + @Ignore("Performance") + @Test + public void testCachePlaceHolder() + throws IOException, WriteProcessException, ExecutionException, InterruptedException { + boolean mayCacheNonExistSeries = + IoTDBDescriptor.getInstance().getMemoryConfig().isMayCacheNonExistSeries(); + try { + System.out.println("warming up"); + System.out.println("Do not cache non-exist series"); + IoTDBDescriptor.getInstance().getMemoryConfig().setMayCacheNonExistSeries(false); + testCachePlaceHolderInternal(); + System.out.println("Cache non-exist series"); + IoTDBDescriptor.getInstance().getMemoryConfig().setMayCacheNonExistSeries(true); + testCachePlaceHolderInternal(); + + System.out.println("actual test"); + System.out.println("Do not cache non-exist series"); + IoTDBDescriptor.getInstance().getMemoryConfig().setMayCacheNonExistSeries(false); + testCachePlaceHolderInternal(); + System.out.println("Cache non-exist series"); + IoTDBDescriptor.getInstance().getMemoryConfig().setMayCacheNonExistSeries(true); + testCachePlaceHolderInternal(); + } finally { + IoTDBDescriptor.getInstance() + .getMemoryConfig() + .setMayCacheNonExistSeries(mayCacheNonExistSeries); + } + } +} diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 207c0507093b..78c7701b4f71 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -1035,6 +1035,11 @@ read_consistency_level=strong # Datatype: boolean meta_data_cache_enable=true +# Whether to cache a placeholder for non-exist series. +# effectiveMode: restart +# Datatype: boolean +may_cache_nonexist_series=true + # Read memory Allocation Ratio: BloomFilterCache : ChunkCache : TimeSeriesMetadataCache : Coordinator : Operators : DataExchange : timeIndex in TsFileResourceList : others. # The parameter form is a:b:c:d:e:f:g:h, where a, b, c, d, e, f, g and h are integers. for example: 1:1:1:1:1:1:1:1 , 1:100:200:50:200:200:200:50 # effectiveMode: restart