Skip to content

Commit c564ee3

Browse files
Cleanup file cache on deleting index/shard directory (opensearch-project#11443)
* cleanup file cache on deleting index/shard directory Signed-off-by: panguixin <[email protected]> * add index store listener Signed-off-by: panguixin <[email protected]> --------- Signed-off-by: panguixin <[email protected]>
1 parent 26a66f0 commit c564ee3

File tree

8 files changed

+235
-72
lines changed

8 files changed

+235
-72
lines changed

server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,17 @@
3030
import org.opensearch.cluster.routing.GroupShardsIterator;
3131
import org.opensearch.cluster.routing.ShardIterator;
3232
import org.opensearch.cluster.routing.ShardRouting;
33+
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
3334
import org.opensearch.common.Priority;
3435
import org.opensearch.common.io.PathUtils;
3536
import org.opensearch.common.settings.Settings;
3637
import org.opensearch.common.unit.TimeValue;
3738
import org.opensearch.core.common.unit.ByteSizeUnit;
3839
import org.opensearch.core.index.Index;
40+
import org.opensearch.core.index.shard.ShardId;
3941
import org.opensearch.index.IndexModule;
4042
import org.opensearch.index.IndexNotFoundException;
43+
import org.opensearch.index.shard.ShardPath;
4144
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
4245
import org.opensearch.index.store.remote.filecache.FileCacheStats;
4346
import org.opensearch.monitor.fs.FsInfo;
@@ -859,4 +862,75 @@ private void assertCacheDirectoryReplicaAndIndexCount(int numCacheFolderCount, i
859862
// Verifies if all the shards (primary and replica) have been deleted
860863
assertEquals(numCacheFolderCount, searchNodeFileCachePaths.size());
861864
}
865+
866+
public void testRelocateSearchableSnapshotIndex() throws Exception {
867+
final String snapshotName = "test-snap";
868+
final String repoName = "test-repo";
869+
final String indexName = "test-idx-1";
870+
final String restoredIndexName = indexName + "-copy";
871+
final Client client = client();
872+
873+
internalCluster().ensureAtLeastNumDataNodes(1);
874+
createIndexWithDocsAndEnsureGreen(0, 100, indexName);
875+
876+
createRepositoryWithSettings(null, repoName);
877+
takeSnapshot(client, snapshotName, repoName, indexName);
878+
deleteIndicesAndEnsureGreen(client, indexName);
879+
880+
String searchNode1 = internalCluster().startSearchOnlyNodes(1).get(0);
881+
internalCluster().validateClusterFormed();
882+
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
883+
assertRemoteSnapshotIndexSettings(client, restoredIndexName);
884+
885+
String searchNode2 = internalCluster().startSearchOnlyNodes(1).get(0);
886+
internalCluster().validateClusterFormed();
887+
888+
final Index index = resolveIndex(restoredIndexName);
889+
assertSearchableSnapshotIndexDirectoryExistence(searchNode1, index, true);
890+
assertSearchableSnapshotIndexDirectoryExistence(searchNode2, index, false);
891+
892+
// relocate the shard from node1 to node2
893+
client.admin()
894+
.cluster()
895+
.prepareReroute()
896+
.add(new MoveAllocationCommand(restoredIndexName, 0, searchNode1, searchNode2))
897+
.execute()
898+
.actionGet();
899+
ClusterHealthResponse clusterHealthResponse = client.admin()
900+
.cluster()
901+
.prepareHealth()
902+
.setWaitForEvents(Priority.LANGUID)
903+
.setWaitForNoRelocatingShards(true)
904+
.setTimeout(new TimeValue(5, TimeUnit.MINUTES))
905+
.execute()
906+
.actionGet();
907+
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
908+
assertDocCount(restoredIndexName, 100L);
909+
910+
assertSearchableSnapshotIndexDirectoryExistence(searchNode1, index, false);
911+
assertSearchableSnapshotIndexDirectoryExistence(searchNode2, index, true);
912+
deleteIndicesAndEnsureGreen(client, restoredIndexName);
913+
assertSearchableSnapshotIndexDirectoryExistence(searchNode2, index, false);
914+
}
915+
916+
private void assertSearchableSnapshotIndexDirectoryExistence(String nodeName, Index index, boolean exists) throws Exception {
917+
final Node node = internalCluster().getInstance(Node.class, nodeName);
918+
final ShardId shardId = new ShardId(index, 0);
919+
final ShardPath shardPath = ShardPath.loadFileCachePath(node.getNodeEnvironment(), shardId);
920+
921+
assertBusy(() -> {
922+
assertTrue(
923+
"shard state path should " + (exists ? "exist" : "not exist"),
924+
Files.exists(shardPath.getShardStatePath()) == exists
925+
);
926+
assertTrue("shard cache path should " + (exists ? "exist" : "not exist"), Files.exists(shardPath.getDataPath()) == exists);
927+
}, 30, TimeUnit.SECONDS);
928+
929+
final Path indexDataPath = node.getNodeEnvironment().fileCacheNodePath().fileCachePath.resolve(index.getUUID());
930+
final Path indexPath = node.getNodeEnvironment().fileCacheNodePath().indicesPath.resolve(index.getUUID());
931+
assertBusy(() -> {
932+
assertTrue("index path should " + (exists ? "exist" : "not exist"), Files.exists(indexDataPath) == exists);
933+
assertTrue("index cache path should " + (exists ? "exist" : "not exist"), Files.exists(indexPath) == exists);
934+
}, 30, TimeUnit.SECONDS);
935+
}
862936
}

server/src/main/java/org/opensearch/env/NodeEnvironment.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,8 @@ public String toString() {
199199

200200
private final NodeMetadata nodeMetadata;
201201

202+
private final IndexStoreListener indexStoreListener;
203+
202204
/**
203205
* Maximum number of data nodes that should run in an environment.
204206
*/
@@ -295,18 +297,23 @@ public void close() {
295297
}
296298
}
297299

300+
public NodeEnvironment(Settings settings, Environment environment) throws IOException {
301+
this(settings, environment, IndexStoreListener.EMPTY);
302+
}
303+
298304
/**
299305
* Setup the environment.
300306
* @param settings settings from opensearch.yml
301307
*/
302-
public NodeEnvironment(Settings settings, Environment environment) throws IOException {
303-
if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) {
308+
public NodeEnvironment(Settings settings, Environment environment, IndexStoreListener indexStoreListener) throws IOException {
309+
if (DiscoveryNode.nodeRequiresLocalStorage(settings) == false) {
304310
nodePaths = null;
305311
fileCacheNodePath = null;
306312
sharedDataPath = null;
307313
locks = null;
308314
nodeLockId = -1;
309315
nodeMetadata = new NodeMetadata(generateNodeId(settings), Version.CURRENT);
316+
this.indexStoreListener = IndexStoreListener.EMPTY;
310317
return;
311318
}
312319
boolean success = false;
@@ -385,6 +392,7 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
385392
}
386393

387394
this.nodeMetadata = loadNodeMetadata(settings, logger, nodePaths);
395+
this.indexStoreListener = indexStoreListener;
388396
success = true;
389397
} finally {
390398
if (success == false) {
@@ -577,6 +585,9 @@ public static void acquireFSLockForPaths(IndexSettings indexSettings, Path... sh
577585
public void deleteShardDirectoryUnderLock(ShardLock lock, IndexSettings indexSettings) throws IOException {
578586
final ShardId shardId = lock.getShardId();
579587
assert isShardLocked(shardId) : "shard " + shardId + " is not locked";
588+
589+
indexStoreListener.beforeShardPathDeleted(shardId, indexSettings, this);
590+
580591
final Path[] paths = availableShardPaths(shardId);
581592
logger.trace("acquiring locks for {}, paths: [{}]", shardId, paths);
582593
acquireFSLockForPaths(indexSettings, paths);
@@ -653,6 +664,8 @@ public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, IndexSetti
653664
* @param indexSettings settings for the index being deleted
654665
*/
655666
public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettings) throws IOException {
667+
indexStoreListener.beforeIndexPathDeleted(index, indexSettings, this);
668+
656669
final Path[] indexPaths = indexPaths(index);
657670
logger.trace("deleting index {} directory, paths({}): [{}]", index, indexPaths.length, indexPaths);
658671
IOUtils.rm(indexPaths);
@@ -663,6 +676,18 @@ public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettin
663676
}
664677
}
665678

679+
private void deleteIndexFileCacheDirectory(Index index) {
680+
final Path indexCachePath = fileCacheNodePath().fileCachePath.resolve(index.getUUID());
681+
logger.trace("deleting index {} file cache directory, path: [{}]", index, indexCachePath);
682+
if (Files.exists(indexCachePath)) {
683+
try {
684+
IOUtils.rm(indexCachePath);
685+
} catch (IOException e) {
686+
logger.error(() -> new ParameterizedMessage("Failed to delete cache path for index {}", index), e);
687+
}
688+
}
689+
}
690+
666691
/**
667692
* Tries to lock all local shards for the given index. If any of the shard locks can't be acquired
668693
* a {@link ShardLockObtainFailedException} is thrown and all previously acquired locks are released.
@@ -1387,4 +1412,18 @@ private static void tryWriteTempFile(Path path) throws IOException {
13871412
}
13881413
}
13891414
}
1415+
1416+
/**
1417+
* A listener that is executed on per-index and per-shard store events, like deleting shard path
1418+
*
1419+
* @opensearch.internal
1420+
*/
1421+
public interface IndexStoreListener {
1422+
default void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) {}
1423+
1424+
default void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) {}
1425+
1426+
IndexStoreListener EMPTY = new IndexStoreListener() {
1427+
};
1428+
}
13901429
}

server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java

Lines changed: 54 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,13 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.apache.logging.log4j.message.ParameterizedMessage;
14-
import org.opensearch.common.settings.Settings;
14+
import org.opensearch.common.inject.Provider;
1515
import org.opensearch.common.util.io.IOUtils;
1616
import org.opensearch.core.index.Index;
1717
import org.opensearch.core.index.shard.ShardId;
1818
import org.opensearch.env.NodeEnvironment;
19-
import org.opensearch.index.IndexModule;
2019
import org.opensearch.index.IndexSettings;
21-
import org.opensearch.index.shard.IndexEventListener;
2220
import org.opensearch.index.shard.ShardPath;
23-
import org.opensearch.indices.cluster.IndicesClusterStateService;
2421

2522
import java.io.IOException;
2623
import java.nio.file.DirectoryStream;
@@ -30,79 +27,90 @@
3027
import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION;
3128

3229
/**
33-
* IndexEventListener to clean up file cache when the index is deleted. The cached entries will be eligible
30+
* IndexStoreListener to clean up file cache when the index is deleted. The cached entries will be eligible
3431
* for eviction when the shard is deleted, but this listener deterministically removes entries from memory and
3532
* from disk at the time of shard deletion as opposed to waiting for the cache to need to perform eviction.
3633
*
3734
* @opensearch.internal
3835
*/
39-
public class FileCacheCleaner implements IndexEventListener {
40-
private static final Logger log = LogManager.getLogger(FileCacheCleaner.class);
36+
public class FileCacheCleaner implements NodeEnvironment.IndexStoreListener {
37+
private static final Logger logger = LogManager.getLogger(FileCacheCleaner.class);
4138

42-
private final NodeEnvironment nodeEnvironment;
43-
private final FileCache fileCache;
39+
private final Provider<FileCache> fileCacheProvider;
4440

45-
public FileCacheCleaner(NodeEnvironment nodeEnvironment, FileCache fileCache) {
46-
this.nodeEnvironment = nodeEnvironment;
47-
this.fileCache = fileCache;
41+
public FileCacheCleaner(Provider<FileCache> fileCacheProvider) {
42+
this.fileCacheProvider = fileCacheProvider;
4843
}
4944

5045
/**
51-
* before shard deleted and after shard closed, cleans up the corresponding index file path entries from FC.
52-
* @param shardId The shard id
53-
* @param settings the shards index settings
46+
* before shard path deleted, cleans up the corresponding index file path entries from FC and delete the corresponding shard file
47+
* cache path.
48+
*
49+
* @param shardId the shard id
50+
* @param indexSettings the index settings
51+
* @param nodeEnvironment the node environment
5452
*/
5553
@Override
56-
public void beforeIndexShardDeleted(ShardId shardId, Settings settings) {
54+
public void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment nodeEnvironment) {
55+
if (indexSettings.isRemoteSnapshot()) {
56+
final ShardPath shardPath = ShardPath.loadFileCachePath(nodeEnvironment, shardId);
57+
cleanupShardFileCache(shardPath);
58+
deleteShardFileCacheDirectory(shardPath);
59+
}
60+
}
61+
62+
/**
63+
* Cleans up the corresponding index file path entries from FileCache
64+
*
65+
* @param shardPath the shard path
66+
*/
67+
private void cleanupShardFileCache(ShardPath shardPath) {
5768
try {
58-
if (isRemoteSnapshot(settings)) {
59-
final ShardPath shardPath = ShardPath.loadFileCachePath(nodeEnvironment, shardId);
60-
final Path localStorePath = shardPath.getDataPath().resolve(LOCAL_STORE_LOCATION);
61-
try (DirectoryStream<Path> ds = Files.newDirectoryStream(localStorePath)) {
62-
for (Path subPath : ds) {
63-
fileCache.remove(subPath.toRealPath());
64-
}
69+
final FileCache fc = fileCacheProvider.get();
70+
assert fc != null;
71+
final Path localStorePath = shardPath.getDataPath().resolve(LOCAL_STORE_LOCATION);
72+
try (DirectoryStream<Path> ds = Files.newDirectoryStream(localStorePath)) {
73+
for (Path subPath : ds) {
74+
fc.remove(subPath.toRealPath());
6575
}
6676
}
6777
} catch (IOException ioe) {
68-
log.error(() -> new ParameterizedMessage("Error removing items from cache during shard deletion {}", shardId), ioe);
78+
logger.error(
79+
() -> new ParameterizedMessage("Error removing items from cache during shard deletion {}", shardPath.getShardId()),
80+
ioe
81+
);
6982
}
7083
}
7184

72-
@Override
73-
public void afterIndexShardDeleted(ShardId shardId, Settings settings) {
74-
if (isRemoteSnapshot(settings)) {
75-
final Path path = ShardPath.loadFileCachePath(nodeEnvironment, shardId).getDataPath();
76-
try {
77-
if (Files.exists(path)) {
78-
IOUtils.rm(path);
79-
}
80-
} catch (IOException e) {
81-
log.error(() -> new ParameterizedMessage("Failed to delete cache path for shard {}", shardId), e);
85+
private void deleteShardFileCacheDirectory(ShardPath shardPath) {
86+
final Path path = shardPath.getDataPath();
87+
try {
88+
if (Files.exists(path)) {
89+
IOUtils.rm(path);
8290
}
91+
} catch (IOException e) {
92+
logger.error(() -> new ParameterizedMessage("Failed to delete cache path for shard {}", shardPath.getShardId()), e);
8393
}
8494
}
8595

96+
/**
97+
* before index path deleted, delete the corresponding index file cache path.
98+
*
99+
* @param index the index
100+
* @param indexSettings the index settings
101+
* @param nodeEnvironment the node environment
102+
*/
86103
@Override
87-
public void afterIndexRemoved(
88-
Index index,
89-
IndexSettings indexSettings,
90-
IndicesClusterStateService.AllocatedIndices.IndexRemovalReason reason
91-
) {
92-
if (isRemoteSnapshot(indexSettings.getSettings())
93-
&& reason == IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.DELETED) {
104+
public void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment nodeEnvironment) {
105+
if (indexSettings.isRemoteSnapshot()) {
94106
final Path indexCachePath = nodeEnvironment.fileCacheNodePath().fileCachePath.resolve(index.getUUID());
95107
if (Files.exists(indexCachePath)) {
96108
try {
97109
IOUtils.rm(indexCachePath);
98110
} catch (IOException e) {
99-
log.error(() -> new ParameterizedMessage("Failed to delete cache path for index {}", index), e);
111+
logger.error(() -> new ParameterizedMessage("Failed to delete cache path for index {}", index), e);
100112
}
101113
}
102114
}
103115
}
104-
105-
private static boolean isRemoteSnapshot(Settings settings) {
106-
return IndexModule.Type.REMOTE_SNAPSHOT.match(settings.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()));
107-
}
108116
}

server/src/main/java/org/opensearch/indices/IndicesService.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,6 @@
135135
import org.opensearch.index.shard.IndexingOperationListener;
136136
import org.opensearch.index.shard.IndexingStats;
137137
import org.opensearch.index.shard.IndexingStats.Stats.DocStatusStats;
138-
import org.opensearch.index.store.remote.filecache.FileCacheCleaner;
139138
import org.opensearch.index.translog.InternalTranslogFactory;
140139
import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory;
141140
import org.opensearch.index.translog.TranslogFactory;
@@ -362,7 +361,6 @@ public class IndicesService extends AbstractLifecycleComponent
362361
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
363362
private volatile TimeValue clusterDefaultRefreshInterval;
364363
private volatile TimeValue clusterRemoteTranslogBufferInterval;
365-
private final FileCacheCleaner fileCacheCleaner;
366364

367365
private final SearchRequestStats searchRequestStats;
368366

@@ -395,7 +393,6 @@ public IndicesService(
395393
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
396394
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
397395
Supplier<RepositoriesService> repositoriesServiceSupplier,
398-
FileCacheCleaner fileCacheCleaner,
399396
SearchRequestStats searchRequestStats,
400397
@Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
401398
RecoverySettings recoverySettings
@@ -450,7 +447,6 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
450447

451448
this.directoryFactories = directoryFactories;
452449
this.recoveryStateFactories = recoveryStateFactories;
453-
this.fileCacheCleaner = fileCacheCleaner;
454450
// doClose() is called when shutting down a node, yet there might still be ongoing requests
455451
// that we need to wait for before closing some resources such as the caches. In order to
456452
// avoid closing these resources while ongoing requests are still being processed, we use a
@@ -766,7 +762,6 @@ public void onStoreClosed(ShardId shardId) {
766762
};
767763
finalListeners.add(onStoreClose);
768764
finalListeners.add(oldShardsStats);
769-
finalListeners.add(fileCacheCleaner);
770765
final IndexService indexService = createIndexService(
771766
CREATE_INDEX,
772767
indexMetadata,

0 commit comments

Comments
 (0)