|
45 | 45 | import org.apache.lucene.search.TopDocs; |
46 | 46 | import org.apache.lucene.store.Directory; |
47 | 47 | import org.apache.lucene.util.BytesRef; |
| 48 | +import org.opensearch.cluster.metadata.IndexMetadata; |
48 | 49 | import org.opensearch.common.CheckedSupplier; |
49 | 50 | import org.opensearch.common.cache.ICacheKey; |
50 | 51 | import org.opensearch.common.cache.RemovalNotification; |
51 | 52 | import org.opensearch.common.cache.RemovalReason; |
52 | 53 | import org.opensearch.common.cache.module.CacheModule; |
53 | 54 | import org.opensearch.common.cache.service.CacheService; |
| 55 | +import org.opensearch.common.cache.stats.CacheStatsCounterSnapshot; |
| 56 | +import org.opensearch.common.cache.stats.MultiDimensionCacheStats; |
54 | 57 | import org.opensearch.common.io.stream.BytesStreamOutput; |
55 | 58 | import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; |
56 | 59 | import org.opensearch.common.settings.Settings; |
|
70 | 73 | import org.opensearch.index.query.TermQueryBuilder; |
71 | 74 | import org.opensearch.index.shard.IndexShard; |
72 | 75 | import org.opensearch.index.shard.IndexShardState; |
| 76 | +import org.opensearch.index.shard.ShardNotFoundException; |
73 | 77 | import org.opensearch.node.Node; |
74 | 78 | import org.opensearch.test.OpenSearchSingleNodeTestCase; |
75 | 79 | import org.opensearch.threadpool.ThreadPool; |
76 | 80 |
|
77 | 81 | import java.io.IOException; |
78 | 82 | import java.util.ArrayList; |
79 | 83 | import java.util.Arrays; |
| 84 | +import java.util.List; |
80 | 85 | import java.util.Optional; |
81 | 86 | import java.util.UUID; |
82 | 87 | import java.util.concurrent.atomic.AtomicInteger; |
@@ -753,6 +758,117 @@ public void testCacheCleanupBasedOnStaleThreshold_StalenessLesserThanThreshold() |
753 | 758 | terminate(threadPool); |
754 | 759 | } |
755 | 760 |
|
| 761 | + public void testClosingIndexWipesStats() throws Exception { |
| 762 | + IndicesService indicesService = getInstanceFromNode(IndicesService.class); |
| 763 | + // Create two indices each with multiple shards |
| 764 | + int numShards = 3; |
| 765 | + Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards).build(); |
| 766 | + String indexToKeepName = "test"; |
| 767 | + String indexToCloseName = "test2"; |
| 768 | + IndexService indexToKeep = createIndex(indexToKeepName, indexSettings); |
| 769 | + IndexService indexToClose = createIndex(indexToCloseName, indexSettings); |
| 770 | + for (int i = 0; i < numShards; i++) { |
| 771 | + // Check we can get all the shards we expect |
| 772 | + assertNotNull(indexToKeep.getShard(i)); |
| 773 | + assertNotNull(indexToClose.getShard(i)); |
| 774 | + } |
| 775 | + ThreadPool threadPool = getThreadPool(); |
| 776 | + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.001%").build(); |
| 777 | + IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { |
| 778 | + IndexService indexService = null; |
| 779 | + try { |
| 780 | + indexService = indicesService.indexServiceSafe(shardId.getIndex()); |
| 781 | + } catch (IndexNotFoundException ex) { |
| 782 | + return Optional.empty(); |
| 783 | + } |
| 784 | + try { |
| 785 | + return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); |
| 786 | + } catch (ShardNotFoundException ex) { |
| 787 | + return Optional.empty(); |
| 788 | + } |
| 789 | + }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), threadPool); |
| 790 | + Directory dir = newDirectory(); |
| 791 | + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); |
| 792 | + |
| 793 | + writer.addDocument(newDoc(0, "foo")); |
| 794 | + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); |
| 795 | + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); |
| 796 | + if (randomBoolean()) { |
| 797 | + writer.flush(); |
| 798 | + IOUtils.close(writer); |
| 799 | + writer = new IndexWriter(dir, newIndexWriterConfig()); |
| 800 | + } |
| 801 | + writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); |
| 802 | + DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); |
| 803 | + |
| 804 | + List<DirectoryReader> readersToClose = new ArrayList<>(); |
| 805 | + List<DirectoryReader> readersToKeep = new ArrayList<>(); |
| 806 | + // Put entries into the cache for each shard |
| 807 | + for (IndexService indexService : new IndexService[] { indexToKeep, indexToClose }) { |
| 808 | + for (int i = 0; i < numShards; i++) { |
| 809 | + IndexShard indexShard = indexService.getShard(i); |
| 810 | + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); |
| 811 | + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), indexShard.shardId()); |
| 812 | + if (indexService == indexToClose) { |
| 813 | + readersToClose.add(reader); |
| 814 | + } else { |
| 815 | + readersToKeep.add(reader); |
| 816 | + } |
| 817 | + Loader loader = new Loader(reader, 0); |
| 818 | + cache.getOrCompute(entity, loader, reader, termBytes); |
| 819 | + } |
| 820 | + } |
| 821 | + |
| 822 | + // Check resulting stats |
| 823 | + List<List<String>> initialDimensionValues = new ArrayList<>(); |
| 824 | + for (IndexService indexService : new IndexService[] { indexToKeep, indexToClose }) { |
| 825 | + for (int i = 0; i < numShards; i++) { |
| 826 | + ShardId shardId = indexService.getShard(i).shardId(); |
| 827 | + List<String> dimensionValues = List.of(shardId.getIndexName(), shardId.toString()); |
| 828 | + initialDimensionValues.add(dimensionValues); |
| 829 | + CacheStatsCounterSnapshot snapshot = ((MultiDimensionCacheStats) cache.getCacheStats()).getStatsForDimensionValues( |
| 830 | + dimensionValues |
| 831 | + ); |
| 832 | + assertNotNull(snapshot); |
| 833 | + // check the values are not empty by confirming entries != 0, this should always be true since the missed value is loaded |
| 834 | + // into the cache |
| 835 | + assertNotEquals(0, snapshot.getEntries()); |
| 836 | + } |
| 837 | + } |
| 838 | + |
| 839 | + // Delete an index |
| 840 | + indexToClose.close("test_deletion", true); |
| 841 | + // This actually closes the shards associated with the readers, which is necessary for cache cleanup logic |
| 842 | + // In this UT, manually close the readers as well; could not figure out how to connect all this up in a UT so that |
| 843 | + // we could get readers that were properly connected to an index's directory |
| 844 | + for (DirectoryReader reader : readersToClose) { |
| 845 | + IOUtils.close(reader); |
| 846 | + } |
| 847 | + // Trigger cache cleanup |
| 848 | + cache.cacheCleanupManager.cleanCache(); |
| 849 | + |
| 850 | + // Now stats for the closed index should be gone |
| 851 | + for (List<String> dimensionValues : initialDimensionValues) { |
| 852 | + CacheStatsCounterSnapshot snapshot = ((MultiDimensionCacheStats) cache.getCacheStats()).getStatsForDimensionValues( |
| 853 | + dimensionValues |
| 854 | + ); |
| 855 | + if (dimensionValues.get(0).equals(indexToCloseName)) { |
| 856 | + assertNull(snapshot); |
| 857 | + } else { |
| 858 | + assertNotNull(snapshot); |
| 859 | + // check the values are not empty by confirming entries != 0, this should always be true since the missed value is loaded |
| 860 | + // into the cache |
| 861 | + assertNotEquals(0, snapshot.getEntries()); |
| 862 | + } |
| 863 | + } |
| 864 | + |
| 865 | + for (DirectoryReader reader : readersToKeep) { |
| 866 | + IOUtils.close(reader); |
| 867 | + } |
| 868 | + IOUtils.close(secondReader, writer, dir, cache); |
| 869 | + terminate(threadPool); |
| 870 | + } |
| 871 | + |
756 | 872 | public void testEviction() throws Exception { |
757 | 873 | final ByteSizeValue size; |
758 | 874 | { |
|
0 commit comments