diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java new file mode 100644 index 0000000000000..5da0b545e215f --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java @@ -0,0 +1,149 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.indices; + +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.cache.tier.TierType; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.index.cache.request.RequestCacheStats; +import org.opensearch.index.cache.request.ShardRequestCache; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.test.OpenSearchIntegTestCase; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; + +// This is a separate file from IndicesRequestCacheIT because we only want to run our test +// on a node with a maximum request cache size that we set. + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class IndicesRequestCacheDiskTierIT extends OpenSearchIntegTestCase { + public void testDiskTierStats() throws Exception { + int heapSizeBytes = 4729; + String node = internalCluster().startNode( + Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes)) + ); + Client client = client(node); + + Settings.Builder indicesSettingBuilder = Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0); + + assertAcked( + client.admin().indices().prepareCreate("index").setMapping("k", "type=keyword").setSettings(indicesSettingBuilder).get() + ); + indexRandom(true, client.prepareIndex("index").setSource("k", "hello")); + ensureSearchable("index"); + SearchResponse resp; + + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + 0)).get(); + int requestSize = (int) getCacheSizeBytes(client, "index", TierType.ON_HEAP); + assertTrue(heapSizeBytes > requestSize); + // If this fails, increase heapSizeBytes! We can't adjust it after getting the size of one query + // as the cache size setting is not dynamic + + int numOnDisk = 5; + int numRequests = heapSizeBytes / requestSize + numOnDisk; + for (int i = 1; i < numRequests; i++) { + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get(); + assertSearchResponse(resp); + IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false); + IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false); + } + + + long diskReachesSoFar = getDiskStatsAccumulator(client, "index").getTotalDiskReaches(); + long tookTimeSoFar = getDiskStatsAccumulator(client, "index").getTotalGetTime(); + // So far, disk-specific stats should be 0, as keystore has prevented any actual disk reaches + // assertEquals(diskReachesSoFar, 0); // TODO: Once keystore is integrated, this value should be 0 + // assertEquals(getTimeSoFar, 0); // TODO: Once keystore is integrated, this value should be 0 + + // long tookTimeSoFar = assertDiskTierSpecificStats(client, "index", 0, -1, 0); // TODO: Uncomment once keystore is integrated + + // the first request, for "hello0", should have been evicted to the disk tier + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello0")).get(); + IndicesRequestCacheIT.assertCacheState(client, "index", 0, numRequests + 1, TierType.ON_HEAP, false); + IndicesRequestCacheIT.assertCacheState(client, "index", 1, numRequests, TierType.DISK, false); + tookTimeSoFar = assertDiskTierSpecificStats(client, "index", 1 + diskReachesSoFar, tookTimeSoFar, -1); + + // We make another actual request that should be in the disk tier. Disk specific stats should keep incrementing + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello1")).get(); + IndicesRequestCacheIT.assertCacheState(client, "index", 0, numRequests + 2, TierType.ON_HEAP, false); + IndicesRequestCacheIT.assertCacheState(client, "index", 2, numRequests, TierType.DISK, false); + tookTimeSoFar = assertDiskTierSpecificStats(client, "index", 2 + diskReachesSoFar, tookTimeSoFar, -1); + + // A final request for something in neither tier shouldn't increment disk specific stats (once keystore is on) + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + numRequests)).get(); + IndicesRequestCacheIT.assertCacheState(client, "index", 0, numRequests + 3, TierType.ON_HEAP, false); + IndicesRequestCacheIT.assertCacheState(client, "index", 2, numRequests + 1, TierType.DISK, false); + //assertDiskTierSpecificStats(client, "index", 2 + diskReachesSoFar, tookTimeSoFar, tookTimeSoFar); + // TODO: Uncomment once keystore is integrated + + } + + private long getCacheSizeBytes(Client client, String index, TierType tierType) { + RequestCacheStats requestCacheStats = client.admin() + .indices() + .prepareStats(index) + .setRequestCache(true) + .get() + .getTotal() + .getRequestCache(); + return requestCacheStats.getMemorySizeInBytes(tierType); + } + + private long assertDiskTierSpecificStats(Client client, String index, long totalDiskReaches, long totalGetTimeLowerBound, long totalGetTimeUpperBound) { + // set bounds to -1 to ignore them + ShardRequestCache.DiskStatsAccumulator specStats = getDiskStatsAccumulator(client, index); + assertEquals(totalDiskReaches, specStats.getTotalDiskReaches()); + long tookTime = specStats.getTotalGetTime(); + assertTrue(tookTime >= totalGetTimeLowerBound || totalGetTimeLowerBound < 0); + assertTrue(tookTime <= totalGetTimeUpperBound || totalGetTimeUpperBound < 0); + return tookTime; // Return for use in next check + } + + private ShardRequestCache.DiskStatsAccumulator getDiskStatsAccumulator(Client client, String index) { + RequestCacheStats requestCacheStats = client.admin() + .indices() + .prepareStats(index) + .setRequestCache(true) + .get() + .getTotal() + .getRequestCache(); + return requestCacheStats.getDiskSpecificStats(); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 5dcd5a9f44c7a..9680f407d9aaa 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -663,7 +663,9 @@ public void testCacheWithInvalidation() throws Exception { resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); assertSearchResponse(resp); // Should expect hit as here as refresh didn't happen - assertCacheState(client, "index", 1, 1); + assertCacheState(client, "index", 1, 1, TierType.ON_HEAP, false); + assertCacheState(client, "index", 0, 1, TierType.DISK, false); + assertNumCacheEntries(client, "index", 1, TierType.ON_HEAP); // Explicit refresh would invalidate cache refresh(); @@ -671,10 +673,20 @@ public void testCacheWithInvalidation() throws Exception { resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); assertSearchResponse(resp); // Should expect miss as key has changed due to change in IndexReader.CacheKey (due to refresh) - assertCacheState(client, "index", 1, 2); + assertCacheState(client, "index", 1, 2, TierType.ON_HEAP, false); + assertCacheState(client, "index", 0, 2, TierType.DISK, false); + + //assertNumCacheEntries(client, "index", 1, TierType.ON_HEAP); // Evictions won't be 1 until the cache cleaner runs every minute } - private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) { + protected static void assertCacheState( + Client client, + String index, + long expectedHits, + long expectedMisses, + TierType tierType, + boolean enforceZeroEvictions + ) { RequestCacheStats requestCacheStats = client.admin() .indices() .prepareStats(index) @@ -684,11 +696,35 @@ private static void assertCacheState(Client client, String index, long expectedH .getRequestCache(); // Check the hit count and miss count together so if they are not // correct we can see both values - assertEquals( - Arrays.asList(expectedHits, expectedMisses, 0L), - Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount(), requestCacheStats.getEvictions()) - ); + if (enforceZeroEvictions) { + assertEquals( + Arrays.asList(expectedHits, expectedMisses, 0L), + Arrays.asList( + requestCacheStats.getHitCount(tierType), + requestCacheStats.getMissCount(tierType), + requestCacheStats.getEvictions(tierType) + ) + ); + } else { + assertEquals( + Arrays.asList(expectedHits, expectedMisses), + Arrays.asList(requestCacheStats.getHitCount(tierType), requestCacheStats.getMissCount(tierType)) + ); + } + } + + protected static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) { + assertCacheState(client, index, expectedHits, expectedMisses, TierType.ON_HEAP, true); + } + protected static void assertNumCacheEntries(Client client, String index, long expectedEntries, TierType tierType) { + RequestCacheStats requestCacheStats = client.admin() + .indices() + .prepareStats(index) + .setRequestCache(true) + .get() + .getTotal() + .getRequestCache(); + assertEquals(expectedEntries, requestCacheStats.getEntries(tierType)); } - } diff --git a/server/src/main/java/org/opensearch/common/cache/tier/CacheValue.java b/server/src/main/java/org/opensearch/common/cache/tier/CacheValue.java new file mode 100644 index 0000000000000..baeac79c172d9 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/CacheValue.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +import org.opensearch.common.cache.tier.TierType; + +import java.util.Map; + +/** + * Represents a cache value, with its associated tier type where it is stored, + * and tier-specific stats for an individual request stored in a map. + * @param Type of value. + */ +public class CacheValue { + V value; + TierType source; + TierRequestStats stats; + + CacheValue(V value, TierType source, TierRequestStats stats) { + this.value = value; + this.source = source; + this.stats = stats; + } + + public V getValue() { + return value; + } + + public TierType getSource() { + return source; + } + + public TierRequestStats getStats() { + return stats; + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/tier/CachingTier.java b/server/src/main/java/org/opensearch/common/cache/tier/CachingTier.java index 48fd5deadc111..2e94cacd0f40a 100644 --- a/server/src/main/java/org/opensearch/common/cache/tier/CachingTier.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/CachingTier.java @@ -18,7 +18,7 @@ */ public interface CachingTier { - V get(K key); + CacheValue get(K key); void put(K key, V value); diff --git a/server/src/main/java/org/opensearch/common/cache/tier/DiskTierRequestStats.java b/server/src/main/java/org/opensearch/common/cache/tier/DiskTierRequestStats.java new file mode 100644 index 0000000000000..33f36202c3f84 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/DiskTierRequestStats.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +/** + * A class created by disk tier implementations containing disk-specific stats for a single request. + */ +public class DiskTierRequestStats implements TierRequestStats { + + private final long requestGetTimeNanos; + private final boolean requestReachedDisk; + + public DiskTierRequestStats(long requestGetTimeNanos, boolean requestReachedDisk) { + this.requestReachedDisk = requestReachedDisk; + this.requestGetTimeNanos = requestGetTimeNanos; + } + @Override + public TierType getTierType() { + return TierType.DISK; + } + + public long getRequestGetTimeNanos() { + return requestGetTimeNanos; + } + + public boolean getRequestReachedDisk() { + return requestReachedDisk; + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java index fad0c5b1f8552..e1c9d972b6d1a 100644 --- a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java @@ -11,6 +11,7 @@ import org.ehcache.core.spi.service.FileBasedPersistenceContext; import org.ehcache.spi.serialization.SerializerException; import org.opensearch.OpenSearchException; +import org.opensearch.common.Randomness; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.cache.RemovalReason; @@ -26,6 +27,8 @@ import java.util.NoSuchElementException; import java.util.Objects; import java.util.Optional; +import java.util.Random; +import java.util.UUID; import java.util.function.Supplier; import org.ehcache.Cache; @@ -50,7 +53,7 @@ public class EhCacheDiskCachingTier implements DiskCachingTier { // A Cache manager can create many caches. - private static PersistentCacheManager cacheManager = null; + private PersistentCacheManager cacheManager; // Disk cache private Cache cache; @@ -104,7 +107,7 @@ private EhCacheDiskCachingTier(Builder builder) { this.valueSerializer = Objects.requireNonNull(builder.valueSerializer, "Value serializer shouldn't be null"); this.ehCacheEventListener = new EhCacheEventListener(this.valueSerializer); this.maxWeightInBytes = builder.maxWeightInBytes; - this.storagePath = Objects.requireNonNull(builder.storagePath, "Storage path shouldn't be null"); + this.storagePath = Objects.requireNonNull(builder.storagePath, "Storage path shouldn't be null") + UUID.randomUUID(); // temporary fix if (builder.threadPoolAlias == null || builder.threadPoolAlias.isBlank()) { this.threadPoolAlias = THREAD_POOL_ALIAS_PREFIX + "DiskWrite"; } else { @@ -119,9 +122,6 @@ private EhCacheDiskCachingTier(Builder builder) { // Default value is 16 within Ehcache. this.DISK_SEGMENTS = Setting.intSetting(builder.settingPrefix + ".ehcache.disk.segments", 16, 1, 32); - // In test cases, there might be leftover cache managers and caches hanging around, from nodes created in the test case setup - // Destroy them before recreating them - close(); cacheManager = buildCacheManager(); this.cache = buildCache(Duration.ofMillis(expireAfterAccess.getMillis()), builder); } @@ -192,8 +192,19 @@ private CacheEventListenerConfigurationBuilder getListenerConfiguration(Builder< } @Override - public V get(K key) { - return valueSerializer.deserialize(cache.get(key)); + public CacheValue get(K key) { + // Optimize it by adding key store. + boolean reachedDisk = true; // TODO: Change this once we combine this with keystore integration + long now = System.nanoTime(); // Nanoseconds required; milliseconds might be too slow on an SSD + + V value = valueSerializer.deserialize(cache.get(key)); + + long tookTime = -1L; // This value will be ignored by stats accumulator if reachedDisk is false anyway + if (reachedDisk) { + tookTime = System.nanoTime() - now; + } + DiskTierRequestStats stats = new DiskTierRequestStats(tookTime, reachedDisk); + return new CacheValue<>(value, TierType.DISK, stats); } @Override diff --git a/server/src/main/java/org/opensearch/common/cache/tier/OnHeapTierRequestStats.java b/server/src/main/java/org/opensearch/common/cache/tier/OnHeapTierRequestStats.java new file mode 100644 index 0000000000000..8f37f9e8a3141 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/OnHeapTierRequestStats.java @@ -0,0 +1,19 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +/** + * A class created by disk tier implementations containing disk-specific stats for a single request. + */ +public class OnHeapTierRequestStats implements TierRequestStats { + @Override + public TierType getTierType() { + return TierType.ON_HEAP; + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/tier/OpenSearchOnHeapCache.java b/server/src/main/java/org/opensearch/common/cache/tier/OpenSearchOnHeapCache.java index 22d2f769507cf..c7cd065674de7 100644 --- a/server/src/main/java/org/opensearch/common/cache/tier/OpenSearchOnHeapCache.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/OpenSearchOnHeapCache.java @@ -66,8 +66,8 @@ public TierType getTierType() { } @Override - public V get(K key) { - return cache.get(key); + public CacheValue get(K key) { + return new CacheValue(cache.get(key), TierType.ON_HEAP, new OnHeapTierRequestStats()); } @Override diff --git a/server/src/main/java/org/opensearch/common/cache/tier/TierRequestStats.java b/server/src/main/java/org/opensearch/common/cache/tier/TierRequestStats.java new file mode 100644 index 0000000000000..d156be7b3f028 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/TierRequestStats.java @@ -0,0 +1,17 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +/** + * An interface for single-request tier-specific stats, which are produced on each request from a cache tier + * and then added to the correct shard's overall StatsHolder for the tier. + */ +public interface TierRequestStats { + TierType getTierType(); +} diff --git a/server/src/main/java/org/opensearch/common/cache/tier/TierType.java b/server/src/main/java/org/opensearch/common/cache/tier/TierType.java index ca61b636c1dda..e3c18f6a1b5a3 100644 --- a/server/src/main/java/org/opensearch/common/cache/tier/TierType.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/TierType.java @@ -13,6 +13,18 @@ */ public enum TierType { - ON_HEAP, - DISK; + ON_HEAP("on_heap"), + DISK("disk"); + + private final String stringValue; + + TierType(String stringValue) { + // Associate each TierType with a string representation, for use in API responses and elsewhere + this.stringValue = stringValue; + } + + public String getStringValue() { + return this.stringValue; + } } + diff --git a/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheEventListener.java b/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheEventListener.java index 05b59bf16b282..d1c333e21818e 100644 --- a/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheEventListener.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheEventListener.java @@ -17,11 +17,12 @@ */ public interface TieredCacheEventListener { - void onMiss(K key, TierType tierType); + void onMiss(K key, CacheValue cacheValue); void onRemoval(RemovalNotification notification); - void onHit(K key, V value, TierType tierType); + void onHit(K key, CacheValue cacheValue); void onCached(K key, V value, TierType tierType); + // Since only get() produces a CacheValue with stats, no need to modify onCached or onRemoval to have the CacheValue. } diff --git a/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyService.java b/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyService.java index 153dbf9b330f5..f8e037515ae6d 100644 --- a/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyService.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyService.java @@ -164,34 +164,22 @@ private void setRemovalListeners() { private Function> getValueFromTierCache(boolean trackStats) { return key -> { for (CachingTier cachingTier : cachingTierList) { - V value = cachingTier.get(key); - if (value != null) { + CacheValue cacheValue = cachingTier.get(key); + if (cacheValue.value != null) { if (trackStats) { - tieredCacheEventListener.onHit(key, value, cachingTier.getTierType()); + tieredCacheEventListener.onHit(key, cacheValue); } - return new CacheValue<>(value, cachingTier.getTierType()); + return cacheValue; //new CacheValue<>(value, cachingTier.getTierType()); } if (trackStats) { - tieredCacheEventListener.onMiss(key, cachingTier.getTierType()); + tieredCacheEventListener.onMiss(key, cacheValue); } } return null; }; } - /** - * Represents a cache value along with its associated tier type where it is stored. - * @param Type of value. - */ - public static class CacheValue { - V value; - TierType source; - CacheValue(V value, TierType source) { - this.value = value; - this.source = source; - } - } /** * Builder object diff --git a/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java b/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java index 24f68899c2ac7..7de71015e621e 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java +++ b/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java @@ -32,14 +32,22 @@ package org.opensearch.index.cache.request; +import org.opensearch.OpenSearchException; +import org.opensearch.Version; +import org.opensearch.common.cache.tier.TierRequestStats; +import org.opensearch.common.cache.tier.TierType; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.common.cache.tier.TierType; import java.io.IOException; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.Map; /** * Request for the query cache statistics @@ -48,73 +56,165 @@ */ public class RequestCacheStats implements Writeable, ToXContentFragment { - private long memorySize; - private long evictions; - private long hitCount; - private long missCount; + private Map defaultStatsMap = new HashMap<>(){{ + for (TierType tierType : TierType.values()) { + put(tierType.getStringValue(), new StatsHolder()); + // Every possible tier type must have counters, even if they are disabled. Then the counters report 0 + }} + }; + + private Map tierSpecificStatsMap = new HashMap<>(){{ + put(TierType.ON_HEAP.getStringValue(), new ShardRequestCache.OnHeapStatsAccumulator()); + put(TierType.DISK.getStringValue(), new ShardRequestCache.DiskStatsAccumulator()); + }}; public RequestCacheStats() {} public RequestCacheStats(StreamInput in) throws IOException { - memorySize = in.readVLong(); - evictions = in.readVLong(); - hitCount = in.readVLong(); - missCount = in.readVLong(); + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + this.defaultStatsMap = in.readMap(StreamInput::readString, StatsHolder::new); + // Manually fill the tier-specific stats map + this.tierSpecificStatsMap = new HashMap<>(); + tierSpecificStatsMap.put(TierType.ON_HEAP.getStringValue(), new ShardRequestCache.OnHeapStatsAccumulator(in)); + tierSpecificStatsMap.put(TierType.DISK.getStringValue(), new ShardRequestCache.DiskStatsAccumulator(in)); + } else { + // objects from earlier versions only contain on-heap info, and do not have entries info + long memorySize = in.readVLong(); + long evictions = in.readVLong(); + long hitCount = in.readVLong(); + long missCount = in.readVLong(); + this.defaultStatsMap.put(TierType.ON_HEAP.getStringValue(), new StatsHolder(memorySize, evictions, hitCount, missCount, 0)); + } + checkTierSpecificMap(); } - public RequestCacheStats(long memorySize, long evictions, long hitCount, long missCount) { - this.memorySize = memorySize; - this.evictions = evictions; - this.hitCount = hitCount; - this.missCount = missCount; + public RequestCacheStats(Map defaultMap, Map tierSpecificMap) { + // Create a RequestCacheStats with multiple tiers' statistics + // The input maps don't have to have all tiers statistics available + for (TierType tierType : defaultMap.keySet()) { + defaultStatsMap.put(tierType.getStringValue(), defaultMap.get(tierType)); + } + for (TierType tierType : tierSpecificMap.keySet()) { + tierSpecificStatsMap.put(tierType.getStringValue(), tierSpecificMap.get(tierType)); + } + checkTierSpecificMap(); } public void add(RequestCacheStats stats) { - this.memorySize += stats.memorySize; - this.evictions += stats.evictions; - this.hitCount += stats.hitCount; - this.missCount += stats.missCount; + for (TierType tierType : TierType.values()) { + defaultStatsMap.get(tierType.getStringValue()).add(stats.defaultStatsMap.get(tierType.getStringValue())); + tierSpecificStatsMap.get(tierType.getStringValue()).add(stats.tierSpecificStatsMap.get(tierType.getStringValue())); + } + } + + private StatsHolder getTierStats(TierType tierType) { + return defaultStatsMap.get(tierType.getStringValue()); + } + + ShardRequestCache.TierStatsAccumulator getTierSpecificStats(TierType tierType) { // pkg-private for testing + return tierSpecificStatsMap.get(tierType.getStringValue()); + } + + public ShardRequestCache.DiskStatsAccumulator getDiskSpecificStats() { + return (ShardRequestCache.DiskStatsAccumulator) tierSpecificStatsMap.get(TierType.DISK.getStringValue()); + } + + public long getMemorySizeInBytes(TierType tierType) { + return getTierStats(tierType).totalMetric.count(); + } + + public ByteSizeValue getMemorySize(TierType tierType) { + return new ByteSizeValue(getMemorySizeInBytes(tierType)); + } + + public long getEvictions(TierType tierType) { + return getTierStats(tierType).evictionsMetric.count(); } + public long getHitCount(TierType tierType) { + return getTierStats(tierType).hitCount.count(); + } + + public long getMissCount(TierType tierType) { + return getTierStats(tierType).missCount.count(); + } + + public long getEntries(TierType tierType) { + return getTierStats(tierType).entries.count(); + } + + // By default, return on-heap stats if no tier is specified + public long getMemorySizeInBytes() { - return this.memorySize; + return getMemorySizeInBytes(TierType.ON_HEAP); } public ByteSizeValue getMemorySize() { - return new ByteSizeValue(memorySize); + return getMemorySize(TierType.ON_HEAP); } public long getEvictions() { - return this.evictions; + return getEvictions(TierType.ON_HEAP); } public long getHitCount() { - return this.hitCount; + return getHitCount(TierType.ON_HEAP); } public long getMissCount() { - return this.missCount; + return getMissCount(TierType.ON_HEAP); + } + + public long getEntries() { + return getEntries(TierType.ON_HEAP); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVLong(memorySize); - out.writeVLong(evictions); - out.writeVLong(hitCount); - out.writeVLong(missCount); + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeMap(this.defaultStatsMap, StreamOutput::writeString, (o, v) -> v.writeTo(o)); // ? + // Manually write tier-specific stats map + tierSpecificStatsMap.get(TierType.ON_HEAP.getStringValue()).writeTo(out); + tierSpecificStatsMap.get(TierType.DISK.getStringValue()).writeTo(out); + } else { + // Write only on-heap values, and don't write entries metric + StatsHolder heapStats = defaultStatsMap.get(TierType.ON_HEAP.getStringValue()); + out.writeVLong(heapStats.getMemorySize()); + out.writeVLong(heapStats.getEvictions()); + out.writeVLong(heapStats.getHitCount()); + out.writeVLong(heapStats.getMissCount()); + } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(Fields.REQUEST_CACHE_STATS); - builder.humanReadableField(Fields.MEMORY_SIZE_IN_BYTES, Fields.MEMORY_SIZE, getMemorySize()); - builder.field(Fields.EVICTIONS, getEvictions()); - builder.field(Fields.HIT_COUNT, getHitCount()); - builder.field(Fields.MISS_COUNT, getMissCount()); + // write on-heap stats outside of tiers object + getTierStats(TierType.ON_HEAP).toXContent(builder, params); + getTierSpecificStats(TierType.ON_HEAP).toXContent(builder, params); + builder.startObject(Fields.TIERS); + for (TierType tierType : TierType.values()) { // fixed order + if (tierType != TierType.ON_HEAP) { + String tier = tierType.getStringValue(); + builder.startObject(tier); + defaultStatsMap.get(tier).toXContent(builder, params); + tierSpecificStatsMap.get(tier).toXContent(builder, params); + builder.endObject(); + } + } + builder.endObject(); builder.endObject(); return builder; } + private void checkTierSpecificMap() { + for (TierType tierType : TierType.values()) { + if (tierSpecificStatsMap.get(tierType.getStringValue()) == null) { + throw new OpenSearchException("Missing TierStatsAccumulator for TierType " + tierType.getStringValue()); + } + } + } + /** * Fields used for parsing and toXContent * @@ -122,10 +222,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws */ static final class Fields { static final String REQUEST_CACHE_STATS = "request_cache"; + static final String TIERS = "tiers"; static final String MEMORY_SIZE = "memory_size"; static final String MEMORY_SIZE_IN_BYTES = "memory_size_in_bytes"; static final String EVICTIONS = "evictions"; static final String HIT_COUNT = "hit_count"; static final String MISS_COUNT = "miss_count"; + static final String ENTRIES = "entries"; } } diff --git a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java index 795d585d88647..96a04768e3805 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java +++ b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java @@ -33,8 +33,22 @@ package org.opensearch.index.cache.request; import org.apache.lucene.util.Accountable; +import org.opensearch.OpenSearchException; +import org.opensearch.common.cache.tier.CacheValue; +import org.opensearch.common.cache.tier.DiskTierRequestStats; +import org.opensearch.common.cache.tier.OnHeapTierRequestStats; +import org.opensearch.common.cache.tier.TierRequestStats; +import org.opensearch.common.cache.tier.TierType; import org.opensearch.common.metrics.CounterMetric; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.EnumMap; /** * Tracks the portion of the request cache in use for a particular shard. @@ -43,31 +57,48 @@ */ public final class ShardRequestCache { - final CounterMetric evictionsMetric = new CounterMetric(); - final CounterMetric totalMetric = new CounterMetric(); - final CounterMetric hitCount = new CounterMetric(); - final CounterMetric missCount = new CounterMetric(); + // Holds stats common to all tiers + private final EnumMap defaultStatsHolder = new EnumMap<>(TierType.class); + + // Holds tier-specific stats + private final EnumMap tierSpecificStatsHolder = new EnumMap<>(TierType.class); + + public ShardRequestCache() { + tierSpecificStatsHolder.put(TierType.ON_HEAP, new OnHeapStatsAccumulator()); + tierSpecificStatsHolder.put(TierType.DISK, new DiskStatsAccumulator()); + for (TierType tierType : TierType.values()) { + defaultStatsHolder.put(tierType, new StatsHolder()); + if (tierSpecificStatsHolder.get(tierType) == null) { + throw new OpenSearchException("Missing TierStatsAccumulator for TierType " + tierType.getStringValue()); + } + } + } public RequestCacheStats stats() { - // TODO: Change RequestCacheStats to support disk tier stats. - return new RequestCacheStats(totalMetric.count(), evictionsMetric.count(), hitCount.count(), missCount.count()); + return new RequestCacheStats(defaultStatsHolder, tierSpecificStatsHolder); } - public void onHit() { - hitCount.inc(); + public void onHit(CacheValue cacheValue) { + TierType source = cacheValue.getSource(); + defaultStatsHolder.get(source).hitCount.inc(); + tierSpecificStatsHolder.get(source).addRequestStats(cacheValue.getStats()); } - public void onMiss() { - missCount.inc(); + public void onMiss(CacheValue cacheValue) { + TierType source = cacheValue.getSource(); + defaultStatsHolder.get(cacheValue.getSource()).missCount.inc(); + tierSpecificStatsHolder.get(source).addRequestStats(cacheValue.getStats()); } - public void onCached(Accountable key, BytesReference value) { - totalMetric.inc(key.ramBytesUsed() + value.ramBytesUsed()); + public void onCached(Accountable key, BytesReference value, TierType tierType) { + defaultStatsHolder.get(tierType).totalMetric.inc(key.ramBytesUsed() + value.ramBytesUsed()); + defaultStatsHolder.get(tierType).entries.inc(); } - public void onRemoval(Accountable key, BytesReference value, boolean evicted) { + public void onRemoval(Accountable key, BytesReference value, boolean evicted, TierType tierType) { + if (evicted) { - evictionsMetric.inc(); + defaultStatsHolder.get(tierType).evictionsMetric.inc(); } long dec = 0; if (key != null) { @@ -76,6 +107,118 @@ public void onRemoval(Accountable key, BytesReference value, boolean evicted) { if (value != null) { dec += value.ramBytesUsed(); } - totalMetric.dec(dec); + defaultStatsHolder.get(tierType).totalMetric.dec(dec); + defaultStatsHolder.get(tierType).entries.dec(); + } + + /** + * An abstract whose extending classes accumulate tier-specific stats. + * All extending classes should provide a constructor like TierStatsAccumulator(StreamInput in) + * as well as a no-argument constructor + * @param The tier-specific implementation of TierRequestStats to use + */ + static abstract class TierStatsAccumulator implements Writeable, ToXContentFragment { + /** + * Add new stats from a single request to this holder. + * @param stats The stats from a single request to add + */ + public abstract void addRequestStats(S stats); + + /** + * Add the stats from another TierStatsHolder to this TierStatsHolder. + * Used when combining stats across multiple shards. + * @param other The other TierStatsHolder. + */ + public abstract void add(TierStatsAccumulator other); + } + + /** + * This class accumulates on-heap-tier-specific stats for a single shard. + * For now, on-heap tier has no unique stats, but future stats would be added here. + */ + public static class OnHeapStatsAccumulator extends TierStatsAccumulator { + OnHeapStatsAccumulator() {} + OnHeapStatsAccumulator(StreamInput in) {} + @Override + public void addRequestStats(OnHeapTierRequestStats stats) {} + + @Override + public void add(TierStatsAccumulator other) {} + + @Override + public void writeTo(StreamOutput out) throws IOException {} + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder; + } + } + + /** + * This class accumulates disk-tier-specific stats for a single shard. + */ + public static class DiskStatsAccumulator extends TierStatsAccumulator { + final CounterMetric totalGetTime; + final CounterMetric totalDiskReaches; // Number of times a get() has actually reached the disk + public DiskStatsAccumulator() { + this.totalGetTime = new CounterMetric(); + this.totalDiskReaches = new CounterMetric(); + } + + public DiskStatsAccumulator(long totalGetTime, long totalDiskReaches) { + this.totalGetTime = new CounterMetric(); + this.totalGetTime.inc(totalGetTime); + this.totalDiskReaches = new CounterMetric(); + this.totalDiskReaches.inc(totalDiskReaches); + } + + public DiskStatsAccumulator(StreamInput in) throws IOException { + this( + in.readVLong(), + in.readVLong() + ); + } + + public long getTotalGetTime() { + return totalGetTime.count(); + } + + public long getTotalDiskReaches() { + return totalDiskReaches.count(); + } + + @Override + public void addRequestStats(DiskTierRequestStats stats) { + if (stats.getRequestReachedDisk()) { + this.totalDiskReaches.inc(); + this.totalGetTime.inc(stats.getRequestGetTimeNanos()); + } + } + + @Override + public void add(TierStatsAccumulator other) { + assert other.getClass() == DiskStatsAccumulator.class; + DiskStatsAccumulator castOther = (DiskStatsAccumulator) other; + this.totalDiskReaches.inc(castOther.totalDiskReaches.count()); + this.totalGetTime.inc(castOther.totalGetTime.count()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(totalGetTime.count()); + out.writeVLong(totalDiskReaches.count()); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(Fields.TOTAL_GET_TIME, getTotalGetTime()); + builder.field(Fields.TOTAL_DISK_REACHES, getTotalDiskReaches()); + return builder; + } + + static final class Fields { // Used for field names in API response + static final String TOTAL_GET_TIME = "total_get_time_nanos"; + static final String TOTAL_DISK_REACHES = "total_disk_reaches"; + } } } diff --git a/server/src/main/java/org/opensearch/index/cache/request/StatsHolder.java b/server/src/main/java/org/opensearch/index/cache/request/StatsHolder.java new file mode 100644 index 0000000000000..300689cf09dff --- /dev/null +++ b/server/src/main/java/org/opensearch/index/cache/request/StatsHolder.java @@ -0,0 +1,118 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.cache.request; + +import org.opensearch.common.cache.tier.TierRequestStats; +import org.opensearch.common.cache.tier.TierType; +import org.opensearch.common.metrics.CounterMetric; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +/** + * The basic StatsHolder class, which accumulates shard-level stats that are common across all tiers. + * Used in ShardRequestCache. Extending classes also handle tier-specific stats for each tier. + */ +public class StatsHolder implements Serializable, Writeable, ToXContentFragment { + final CounterMetric totalMetric; + final CounterMetric evictionsMetric; + final CounterMetric hitCount; + final CounterMetric missCount; + final CounterMetric entries; + + public StatsHolder() { + this.totalMetric = new CounterMetric(); + this.evictionsMetric = new CounterMetric(); + this.hitCount = new CounterMetric(); + this.missCount = new CounterMetric(); + this.entries = new CounterMetric(); + } + + public StatsHolder(long memorySize, long evictions, long hitCount, long missCount, long entries) { + // Switched argument order to match RequestCacheStats + this.totalMetric = new CounterMetric(); + this.totalMetric.inc(memorySize); + this.evictionsMetric = new CounterMetric(); + this.evictionsMetric.inc(evictions); + this.hitCount = new CounterMetric(); + this.hitCount.inc(hitCount); + this.missCount = new CounterMetric(); + this.missCount.inc(missCount); + this.entries = new CounterMetric(); + this.entries.inc(entries); + } + + public StatsHolder(StreamInput in) throws IOException { + // Read and write the values of the counter metrics. They should always be positive + // This object is new, so we shouldn't need version checks for different behavior + this(in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong()); + // java forces us to do this in one line + // guaranteed to be evaluated in correct order (https://docs.oracle.com/javase/specs/jls/se7/html/jls-15.html#jls-15.7.4) + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(totalMetric.count()); + out.writeVLong(evictionsMetric.count()); + out.writeVLong(hitCount.count()); + out.writeVLong(missCount.count()); + out.writeVLong(entries.count()); + } + + public void add(StatsHolder otherStats) { + // Add the argument's metrics to this object's metrics. + totalMetric.inc(otherStats.totalMetric.count()); + evictionsMetric.inc(otherStats.evictionsMetric.count()); + hitCount.inc(otherStats.hitCount.count()); + missCount.inc(otherStats.missCount.count()); + entries.inc(otherStats.entries.count()); + } + + public long getEvictions() { + return evictionsMetric.count(); + } + + public long getMemorySize() { + return totalMetric.count(); + } + + public long getHitCount() { + return hitCount.count(); + } + + public long getMissCount() { + return missCount.count(); + } + + public long getEntries() { + return entries.count(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.humanReadableField( + RequestCacheStats.Fields.MEMORY_SIZE_IN_BYTES, + RequestCacheStats.Fields.MEMORY_SIZE, + new ByteSizeValue(getMemorySize()) + ); + builder.field(RequestCacheStats.Fields.EVICTIONS, getEvictions()); + builder.field(RequestCacheStats.Fields.HIT_COUNT, getHitCount()); + builder.field(RequestCacheStats.Fields.MISS_COUNT, getMissCount()); + builder.field(RequestCacheStats.Fields.ENTRIES, getEntries()); + return builder; + } +} diff --git a/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java b/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java index 81d4f545e0fd9..ae33c0a90a7e5 100644 --- a/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java +++ b/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java @@ -34,6 +34,7 @@ import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.cache.RemovalReason; +import org.opensearch.common.cache.tier.CacheValue; import org.opensearch.common.cache.tier.TierType; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.index.cache.request.ShardRequestCache; @@ -53,25 +54,27 @@ abstract class AbstractIndexShardCacheEntity implements IndicesRequestCache.Cach @Override public final void onCached(IndicesRequestCache.Key key, BytesReference value, TierType tierType) { - // TODO: Handle tierType in stats - stats().onCached(key, value); + stats().onCached(key, value, tierType); } @Override - public final void onHit(TierType tierType) { - // TODO: Handle tierType in stats - stats().onHit(); + public final void onHit(CacheValue cacheValue) { + stats().onHit(cacheValue); } @Override - public final void onMiss(TierType tierType) { - // TODO: Handle tierType in stats - stats().onMiss(); + public final void onMiss(CacheValue cacheValue) { + stats().onMiss(cacheValue); } @Override public final void onRemoval(RemovalNotification notification) { // TODO: Handle tierType in stats - stats().onRemoval(notification.getKey(), notification.getValue(), notification.getRemovalReason() == RemovalReason.EVICTED); + stats().onRemoval( + notification.getKey(), + notification.getValue(), + notification.getRemovalReason() == RemovalReason.EVICTED, + notification.getTierType() + ); } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index bddd2ae3103c0..cb1c94fbd7e29 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -40,6 +40,9 @@ import org.apache.lucene.util.RamUsageEstimator; import org.opensearch.common.CheckedSupplier; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.tier.BytesReferenceSerializer; +import org.opensearch.common.cache.tier.CacheValue; +import org.opensearch.common.cache.tier.EhCacheDiskCachingTier; import org.opensearch.common.cache.tier.OnHeapCachingTier; import org.opensearch.common.cache.tier.OpenSearchOnHeapCache; import org.opensearch.common.cache.tier.TierType; @@ -115,29 +118,39 @@ public final class IndicesRequestCache implements TieredCacheEventListener tieredCacheService; private final IndicesService indicesService; + private final Settings settings; IndicesRequestCache(Settings settings, IndicesService indicesService) { this.size = INDICES_CACHE_QUERY_SIZE.get(settings); this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null; long sizeInBytes = size.getBytes(); this.indicesService = indicesService; + this.settings = settings; // Initialize onHeap cache tier first. OnHeapCachingTier openSearchOnHeapCache = new OpenSearchOnHeapCache.Builder().setWeigher( (k, v) -> k.ramBytesUsed() + v.ramBytesUsed() ).setMaximumWeight(sizeInBytes).setExpireAfterAccess(expire).build(); - // Initialize tiered cache service. TODO: Enable Disk tier when tiered support is turned on. - tieredCacheService = new TieredCacheSpilloverStrategyService.Builder() - .setOnHeapCachingTier(openSearchOnHeapCache) - .setTieredCacheEventListener(this) - .build(); + // Initialize tiered cache service. + TieredCacheSpilloverStrategyService.Builder tieredCacheServiceBuilder = + new TieredCacheSpilloverStrategyService.Builder() + .setOnHeapCachingTier(openSearchOnHeapCache) + .setTieredCacheEventListener(this); + + + EhCacheDiskCachingTier ehcacheDiskTier = createNewDiskTier(); + tieredCacheServiceBuilder.setOnDiskCachingTier(ehcacheDiskTier); + tieredCacheService = tieredCacheServiceBuilder.build(); } @Override public void close() { tieredCacheService.invalidateAll(); + if (tieredCacheService.getDiskCachingTier().isPresent()) { + tieredCacheService.getDiskCachingTier().get().close(); + } } void clear(CacheEntity entity) { @@ -146,8 +159,8 @@ void clear(CacheEntity entity) { } @Override - public void onMiss(Key key, TierType tierType) { - key.entity.onMiss(tierType); + public void onMiss(Key key, CacheValue cacheValue) { + key.entity.onMiss(cacheValue); } @Override @@ -156,8 +169,8 @@ public void onRemoval(RemovalNotification notification) { } @Override - public void onHit(Key key, BytesReference value, TierType tierType) { - key.entity.onHit(tierType); + public void onHit(Key key, CacheValue cacheValue) { + key.entity.onHit(cacheValue); } @Override @@ -191,9 +204,6 @@ BytesReference getOrCompute( } } } - // else { - // key.entity.onHit(); - // } return value; } @@ -219,7 +229,6 @@ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference * @opensearch.internal */ private static class Loader implements TieredCacheLoader { - private final CacheEntity entity; private final CheckedSupplier loader; private boolean loaded; @@ -266,12 +275,12 @@ interface CacheEntity extends Accountable, Writeable { /** * Called each time this entity has a cache hit. */ - void onHit(TierType tierType); + void onHit(CacheValue cacheValue); /** * Called each time this entity has a cache miss. */ - void onMiss(TierType tierType); + void onMiss(CacheValue cacheValue); /** * Called when this entity instance is removed @@ -285,7 +294,7 @@ interface CacheEntity extends Accountable, Writeable { * * @opensearch.internal */ - class Key implements Accountable, Writeable { + class Key implements Accountable, Writeable { private final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); public final CacheEntity entity; // use as identity equality @@ -422,4 +431,28 @@ long count() { int numRegisteredCloseListeners() { // for testing return registeredClosedListeners.size(); } + + /** + * Creates a new disk tier instance. Should only be run if the instance will actually be used! + * Otherwise, it may not be closed properly. + * @return A new disk tier instance + */ + public EhCacheDiskCachingTier createNewDiskTier() { + //assert FeatureFlags.isEnabled(FeatureFlags.TIERED_CACHING); + long CACHE_SIZE_IN_BYTES = 10000000L; //INDICES_CACHE_DISK_TIER_SIZE.get(settings).getBytes(); + String STORAGE_PATH = indicesService.getNodePaths()[0].indicesPath.toString() + "/request_cache"; + + return new EhCacheDiskCachingTier.Builder() + .setKeyType(Key.class) + .setValueType(BytesReference.class) + .setExpireAfterAccess(TimeValue.MAX_VALUE) // TODO: Is this meant to be the same as IRC expire or different? + .setThreadPoolAlias("ehcacheThreadpool") + .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) + .setStoragePath(STORAGE_PATH) + .setKeySerializer(new IRCKeyWriteableSerializer(this)) + .setValueSerializer(new BytesReferenceSerializer()) + .setSettings(settings) + .setSettingPrefix("indices.requests.tier") + .build(); + } } diff --git a/server/src/main/resources/org/opensearch/bootstrap/security.policy b/server/src/main/resources/org/opensearch/bootstrap/security.policy index 5d588bb9bf1fd..a509ecb544546 100644 --- a/server/src/main/resources/org/opensearch/bootstrap/security.policy +++ b/server/src/main/resources/org/opensearch/bootstrap/security.policy @@ -191,7 +191,13 @@ grant { // For ehcache permission java.lang.RuntimePermission "createClassLoader"; permission java.lang.RuntimePermission "accessClassInPackage.sun.misc"; - + permission java.lang.RuntimePermission "getenv.*"; permission java.lang.RuntimePermission "accessDeclaredMembers"; permission java.lang.reflect.ReflectPermission "suppressAccessChecks"; + permission java.io.FilePermission "disk_cache_tier", "read"; // change this to wherever we will put disk tier folder + permission java.io.FilePermission "disk_cache_tier", "write"; + permission java.io.FilePermission "disk_cache_tier", "delete"; + permission java.io.FilePermission "disk_cache_tier/-", "read"; + permission java.io.FilePermission "disk_cache_tier/-", "write"; + permission java.io.FilePermission "disk_cache_tier/-", "delete"; }; diff --git a/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java b/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java index f443af615d8ec..afa03cc9aab24 100644 --- a/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java +++ b/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java @@ -58,7 +58,11 @@ public void testBasicGetAndPut() throws IOException { ehCacheDiskCachingTierNew.put(entry.getKey(), entry.getValue()); } for (Map.Entry entry : keyValueMap.entrySet()) { - String value = ehCacheDiskCachingTierNew.get(entry.getKey()); + CacheValue value = ehCacheDiskCachingTierNew.get(entry.getKey()); + assertEquals(entry.getValue(), value.value); + assertEquals(TierType.DISK, value.getSource()); + assertTrue(((DiskTierRequestStats) value.getStats()).getRequestReachedDisk()); + assertTrue(((DiskTierRequestStats) value.getStats()).getRequestGetTimeNanos() > 0); } ehCacheDiskCachingTierNew.close(); } @@ -92,7 +96,7 @@ public void testBasicGetAndPutBytesReference() throws Exception { ehCacheDiskCachingTier.put(entry.getKey(), entry.getValue()); } for (Map.Entry entry : keyValueMap.entrySet()) { - BytesReference value = ehCacheDiskCachingTier.get(entry.getKey()); + BytesReference value = ehCacheDiskCachingTier.get(entry.getKey()).value; assertEquals(entry.getValue(), value); } ehCacheDiskCachingTier.close(); @@ -135,8 +139,8 @@ public void testConcurrentPut() throws Exception { phaser.arriveAndAwaitAdvance(); // Will trigger parallel puts above. countDownLatch.await(); // Wait for all threads to finish for (Map.Entry entry : keyValueMap.entrySet()) { - String value = ehCacheDiskCachingTierNew.get(entry.getKey()); - assertEquals(entry.getValue(), value); + CacheValue value = ehCacheDiskCachingTierNew.get(entry.getKey()); + assertEquals(entry.getValue(), value.value); } ehCacheDiskCachingTierNew.close(); } @@ -175,7 +179,7 @@ public void testEhcacheParallelGets() throws Exception { for (Map.Entry entry : keyValueMap.entrySet()) { threads[j] = new Thread(() -> { phaser.arriveAndAwaitAdvance(); - assertEquals(entry.getValue(), ehCacheDiskCachingTierNew.get(entry.getKey())); + assertEquals(entry.getValue(), ehCacheDiskCachingTierNew.get(entry.getKey()).value); countDownLatch.countDown(); }); threads[j].start(); diff --git a/server/src/test/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyServiceTests.java b/server/src/test/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyServiceTests.java index bb7a22cc26037..a85d82118ff66 100644 --- a/server/src/test/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyServiceTests.java +++ b/server/src/test/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyServiceTests.java @@ -254,8 +254,8 @@ class MockOnHeapCacheTier implements OnHeapCachingTier, RemovalListe } @Override - public V get(K key) { - return this.onHeapCacheTier.get(key); + public CacheValue get(K key) { + return new CacheValue(this.onHeapCacheTier.get(key), TierType.ON_HEAP, new OnHeapTierRequestStats()); } @Override @@ -340,8 +340,8 @@ class MockTieredCacheEventListener implements TieredCacheEventListener cacheValue) { + enumMap.get(cacheValue.getSource()).missCount.inc(); } @Override @@ -352,8 +352,8 @@ public void onRemoval(RemovalNotification notification) { } @Override - public void onHit(K key, V value, TierType tierType) { - enumMap.get(tierType).hitCount.inc(); + public void onHit(K key, CacheValue cacheValue) { + enumMap.get(cacheValue.getSource()).hitCount.inc(); } @Override @@ -381,8 +381,8 @@ class MockDiskCachingTier implements DiskCachingTier, RemovalListene } @Override - public V get(K key) { - return this.diskTier.get(key); + public CacheValue get(K key) { + return new CacheValue<>(this.diskTier.get(key), TierType.DISK, new DiskTierRequestStats(0L, true)); } @Override diff --git a/server/src/test/java/org/opensearch/index/cache/request/RequestCacheStatsTests.java b/server/src/test/java/org/opensearch/index/cache/request/RequestCacheStatsTests.java new file mode 100644 index 0000000000000..b481cf762f9ec --- /dev/null +++ b/server/src/test/java/org/opensearch/index/cache/request/RequestCacheStatsTests.java @@ -0,0 +1,97 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.cache.request; + +import org.opensearch.common.cache.tier.TierType; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.BytesStreamInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.HashMap; +import java.util.Map; + +public class RequestCacheStatsTests extends OpenSearchTestCase { + public void testConstructorsAndAdd() throws Exception { + RequestCacheStats emptyStats = new RequestCacheStats(); + for (TierType tierType : TierType.values()) { + assertTierState(emptyStats, tierType, 0, 0, 0, 0, 0); + } + assertDiskStatsState(emptyStats, 0, 0); + Map testHeapMap = new HashMap<>(); + testHeapMap.put(TierType.ON_HEAP, new StatsHolder(1, 2, 3, 4, 5)); + Map tierSpecificMap = new HashMap<>(); + tierSpecificMap.put(TierType.DISK, new ShardRequestCache.DiskStatsAccumulator(6, 7)); + RequestCacheStats heapAndSpecificOnlyStats = new RequestCacheStats(testHeapMap, tierSpecificMap); + for (TierType tierType : TierType.values()) { + if (tierType == TierType.ON_HEAP) { + assertTierState(heapAndSpecificOnlyStats, tierType, 1, 2, 3, 4, 5); + } else { + assertTierState(heapAndSpecificOnlyStats, tierType, 0, 0, 0, 0, 0); + } + } + assertDiskStatsState(heapAndSpecificOnlyStats, 6, 7); + + Map testBothTiersMap = new HashMap<>(); + testBothTiersMap.put(TierType.ON_HEAP, new StatsHolder(11, 12, 13, 14, 15)); + testBothTiersMap.put(TierType.DISK, new StatsHolder(6, 7, 8, 9, 10)); + Map newTierSpecificMap = new HashMap<>(); + newTierSpecificMap.put(TierType.ON_HEAP, new ShardRequestCache.OnHeapStatsAccumulator()); + newTierSpecificMap.put(TierType.DISK, new ShardRequestCache.DiskStatsAccumulator(8, 9)); + RequestCacheStats bothTiersStats = new RequestCacheStats(testBothTiersMap, newTierSpecificMap); + assertTierState(bothTiersStats, TierType.ON_HEAP, 11, 12, 13, 14, 15); + assertTierState(bothTiersStats, TierType.DISK, 6, 7, 8, 9, 10); + + bothTiersStats.add(heapAndSpecificOnlyStats); + assertTierState(bothTiersStats, TierType.ON_HEAP, 12, 14, 16, 18, 20); + assertTierState(bothTiersStats, TierType.DISK, 6, 7, 8, 9, 10); + assertDiskStatsState(bothTiersStats, 14, 16); + } + + public void testSerialization() throws Exception { + // This test also implicitly tests StatsHolder serialization + BytesStreamOutput os = new BytesStreamOutput(); + + Map testMap = new HashMap<>(); + testMap.put(TierType.ON_HEAP, new StatsHolder(11, 12, 13, 14, 15)); + testMap.put(TierType.DISK, new StatsHolder(6, 7, 8, 9, 10)); + Map tierSpecificMap = new HashMap<>(); + tierSpecificMap.put(TierType.ON_HEAP, new ShardRequestCache.OnHeapStatsAccumulator()); + tierSpecificMap.put(TierType.DISK, new ShardRequestCache.DiskStatsAccumulator(20, 21)); + RequestCacheStats stats = new RequestCacheStats(testMap, tierSpecificMap); + stats.writeTo(os); + BytesStreamInput is = new BytesStreamInput(BytesReference.toBytes(os.bytes())); + RequestCacheStats deserialized = new RequestCacheStats(is); + + assertTierState(deserialized, TierType.ON_HEAP, 11, 12, 13, 14, 15); + assertTierState(deserialized, TierType.DISK, 6, 7, 8, 9, 10); + assertDiskStatsState(deserialized, 20, 21); + } + + private void assertTierState( + RequestCacheStats stats, + TierType tierType, + long memSize, + long evictions, + long hitCount, + long missCount, + long entries + ) { + assertEquals(memSize, stats.getMemorySizeInBytes(tierType)); + assertEquals(evictions, stats.getEvictions(tierType)); + assertEquals(hitCount, stats.getHitCount(tierType)); + assertEquals(missCount, stats.getMissCount(tierType)); + assertEquals(entries, stats.getEntries(tierType)); + } + + private void assertDiskStatsState(RequestCacheStats stats, long totalGetTime, long totalDiskReaches) { + assertEquals(totalGetTime, ((ShardRequestCache.DiskStatsAccumulator) stats.getTierSpecificStats(TierType.DISK)).getTotalGetTime()); + assertEquals(totalDiskReaches, ((ShardRequestCache.DiskStatsAccumulator) stats.getTierSpecificStats(TierType.DISK)).getTotalDiskReaches()); + } +} diff --git a/server/src/test/java/org/opensearch/index/cache/request/ShardRequestCacheTests.java b/server/src/test/java/org/opensearch/index/cache/request/ShardRequestCacheTests.java new file mode 100644 index 0000000000000..8b97924343152 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/cache/request/ShardRequestCacheTests.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.cache.request; + + +import org.opensearch.common.cache.tier.DiskTierRequestStats; +import org.opensearch.test.OpenSearchTestCase; + +public class ShardRequestCacheTests extends OpenSearchTestCase { + // Serialization and getter logic is implicitly tested in RequestCacheStatsTests.java, + // in this file, check logic for StatsHolder.TierStatsAccumulator implementations + + public void testInit() throws Exception { + ShardRequestCache src = new ShardRequestCache(); + RequestCacheStats rcs = src.stats(); + } + public void testDiskStatsAccumulator() throws Exception { + ShardRequestCache.DiskStatsAccumulator acc = new ShardRequestCache.DiskStatsAccumulator(); + DiskTierRequestStats reachedDiskReqStats = new DiskTierRequestStats(145L, true); + acc.addRequestStats(reachedDiskReqStats); + assertEquals(1, acc.getTotalDiskReaches()); + assertEquals(145, acc.getTotalGetTime()); + DiskTierRequestStats noDiskReqStats = new DiskTierRequestStats(391392L, false); + acc.addRequestStats(noDiskReqStats); + assertEquals(1, acc.getTotalDiskReaches()); + assertEquals(145, acc.getTotalGetTime()); + + ShardRequestCache.DiskStatsAccumulator other = new ShardRequestCache.DiskStatsAccumulator(); + other.addRequestStats(new DiskTierRequestStats(1L, true)); + acc.add(other); + assertEquals(146, acc.getTotalGetTime()); + assertEquals(2, acc.getTotalDiskReaches()); + } +} diff --git a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java index 5dd4eb504ec2f..accd7a29efb43 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java @@ -37,6 +37,7 @@ import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.routing.allocation.DiskThresholdSettings; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.tier.CacheValue; import org.opensearch.common.cache.tier.TierType; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.OpenSearchExecutors; @@ -342,10 +343,10 @@ public Object getCacheIdentity() { } @Override - public void onHit(TierType tierType) {} + public void onHit(CacheValue cacheValue) {} @Override - public void onMiss(TierType tierType) {} + public void onMiss(CacheValue cacheValue) {} @Override public void onRemoval(RemovalNotification notification) {}