Skip to content

Commit 8f838f1

Browse files
kh3raAditya Khera
andauthored
Adding configurable resiliency features to MergedSegmentWarmer (#19629)
* Merge-related config changes Standardized merged segment warmer - related setting names. Added a threshold property that controls which merged segments get pre-copied (warmed), ensuring only segments larger than the specified size are warmed. Added cluster defaults for max_merge_count and max_merge_threads. AsyncPublishReferencedSegmentsTask only runs when INDICES_REPLICATION_MERGES_WARMER_ENABLED_SETTING = true Signed-off-by: Aditya Khera <[email protected]> * Empty commit to trigger build Signed-off-by: Aditya Khera <[email protected]> * Fixing flakiness in ClusterMergeSchedulerConfigsIT Signed-off-by: Aditya Khera <[email protected]> * Fixing breaking changes Signed-off-by: Aditya Khera <[email protected]> * API changes Signed-off-by: Aditya Khera <[email protected]> --------- Signed-off-by: Aditya Khera <[email protected]> Co-authored-by: Aditya Khera <[email protected]>
1 parent f1a1e9f commit 8f838f1

File tree

20 files changed

+1806
-113
lines changed

20 files changed

+1806
-113
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1515
- Add metrics for the merged segment warmer feature ([#18929](https://github.com/opensearch-project/OpenSearch/pull/18929))
1616
- Add pointer based lag metric in pull-based ingestion ([#19635](https://github.com/opensearch-project/OpenSearch/pull/19635))
1717
- Introduced internal API for retrieving metadata about requested indices from transport actions ([#18523](https://github.com/opensearch-project/OpenSearch/pull/18523))
18+
- Add cluster defaults for merge autoThrottle, maxMergeThreads, and maxMergeCount; Add segment size filter to the merged segment warmer ([#19629](https://github.com/opensearch-project/OpenSearch/pull/19629))
1819

1920
### Changed
2021
- Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350))

server/src/internalClusterTest/java/org/opensearch/index/ClusterMergeSchedulerConfigsIT.java

Lines changed: 1049 additions & 0 deletions
Large diffs are not rendered by default.

server/src/internalClusterTest/java/org/opensearch/indices/replication/MergedSegmentWarmerIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
5151
return Settings.builder()
5252
.put(super.nodeSettings(nodeOrdinal))
5353
.put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true)
54+
.put(RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_MIN_SEGMENT_SIZE_THRESHOLD_SETTING.getKey(), "1b")
5455
.build();
5556
}
5657

server/src/internalClusterTest/java/org/opensearch/indices/replication/RemoteStoreMergedSegmentWarmerIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
4444
.put(super.nodeSettings(nodeOrdinal))
4545
.put(remoteStoreClusterSettings("test-remote-store-repo", absolutePath))
4646
.put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true)
47+
.put(RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_MIN_SEGMENT_SIZE_THRESHOLD_SETTING.getKey(), "1b")
4748
.build();
4849
}
4950

server/src/internalClusterTest/java/org/opensearch/merge/MergeStatsIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
5151
return Settings.builder()
5252
.put(super.nodeSettings(nodeOrdinal))
5353
.put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true)
54+
.put(RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_MIN_SEGMENT_SIZE_THRESHOLD_SETTING.getKey(), "1b")
5455
.build();
5556
}
5657

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@
124124
import org.opensearch.index.remote.RemoteStorePressureSettings;
125125
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
126126
import org.opensearch.index.store.remote.filecache.FileCacheSettings;
127+
import org.opensearch.indices.ClusterMergeSchedulerConfig;
127128
import org.opensearch.indices.IndexingMemoryController;
128129
import org.opensearch.indices.IndicesQueryCache;
129130
import org.opensearch.indices.IndicesRequestCache;
@@ -307,6 +308,9 @@ public void apply(Settings value, Settings current, Settings previous) {
307308
IndicesQueryCache.INDICES_QUERIES_CACHE_SKIP_CACHE_FACTOR,
308309
IndicesQueryCache.INDICES_QUERY_CACHE_MIN_FREQUENCY,
309310
IndicesQueryCache.INDICES_QUERY_CACHE_COSTLY_MIN_FREQUENCY,
311+
ClusterMergeSchedulerConfig.CLUSTER_MAX_THREAD_COUNT_SETTING,
312+
ClusterMergeSchedulerConfig.CLUSTER_MAX_MERGE_COUNT_SETTING,
313+
ClusterMergeSchedulerConfig.CLUSTER_AUTO_THROTTLE_SETTING,
310314
IndicesService.CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING,
311315
IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING,
312316
IndicesService.CLUSTER_MINIMUM_INDEX_REFRESH_INTERVAL_SETTING,
@@ -326,6 +330,7 @@ public void apply(Settings value, Settings current, Settings previous) {
326330
ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES,
327331
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
328332
RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING,
333+
RecoverySettings.INDICES_REPLICATION_MERGES_WARMER_MIN_SEGMENT_SIZE_THRESHOLD_SETTING,
329334
RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING,
330335
RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_MAX_BYTES_PER_SEC_SETTING,
331336
RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_TIMEOUT_SETTING,

server/src/main/java/org/opensearch/index/IndexModule.java

Lines changed: 39 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
8484
import org.opensearch.index.store.remote.filecache.FileCache;
8585
import org.opensearch.index.translog.TranslogFactory;
86+
import org.opensearch.indices.ClusterMergeSchedulerConfig;
8687
import org.opensearch.indices.IndicesQueryCache;
8788
import org.opensearch.indices.RemoteStoreSettings;
8889
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
@@ -670,33 +671,8 @@ public IndexService newIndexService(
670671
RemoteStoreSettings remoteStoreSettings,
671672
Supplier<Integer> clusterDefaultMaxMergeAtOnceSupplier
672673
) throws IOException {
673-
return newIndexService(
674-
indexCreationContext,
675-
environment,
676-
xContentRegistry,
677-
shardStoreDeleter,
678-
circuitBreakerService,
679-
bigArrays,
680-
threadPool,
681-
scriptService,
682-
clusterService,
683-
client,
684-
indicesQueryCache,
685-
mapperRegistry,
686-
indicesFieldDataCache,
687-
namedWriteableRegistry,
688-
idFieldDataEnabled,
689-
valuesSourceRegistry,
690-
remoteDirectoryFactory,
691-
translogFactorySupplier,
692-
clusterDefaultRefreshIntervalSupplier,
693-
fixedRefreshIntervalSchedulingEnabled,
694-
shardLevelRefreshEnabled,
695-
recoverySettings,
696-
remoteStoreSettings,
697-
(s) -> {},
698-
shardId -> ReplicationStats.empty(),
699-
clusterDefaultMaxMergeAtOnceSupplier
674+
throw new UnsupportedOperationException(
675+
"This API is removed in OpenSearch version 3.4.0. " + "Use the new overloaded newIndexService() method instead."
700676
);
701677
}
702678

@@ -727,6 +703,40 @@ public IndexService newIndexService(
727703
Consumer<IndexShard> replicator,
728704
Function<ShardId, ReplicationStats> segmentReplicationStatsProvider,
729705
Supplier<Integer> clusterDefaultMaxMergeAtOnceSupplier
706+
) throws IOException {
707+
throw new UnsupportedOperationException(
708+
"This API is removed in OpenSearch version 3.4.0. " + "Use the new overloaded newIndexService() method instead."
709+
);
710+
}
711+
712+
public IndexService newIndexService(
713+
IndexService.IndexCreationContext indexCreationContext,
714+
NodeEnvironment environment,
715+
NamedXContentRegistry xContentRegistry,
716+
IndexService.ShardStoreDeleter shardStoreDeleter,
717+
CircuitBreakerService circuitBreakerService,
718+
BigArrays bigArrays,
719+
ThreadPool threadPool,
720+
ScriptService scriptService,
721+
ClusterService clusterService,
722+
Client client,
723+
IndicesQueryCache indicesQueryCache,
724+
MapperRegistry mapperRegistry,
725+
IndicesFieldDataCache indicesFieldDataCache,
726+
NamedWriteableRegistry namedWriteableRegistry,
727+
BooleanSupplier idFieldDataEnabled,
728+
ValuesSourceRegistry valuesSourceRegistry,
729+
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
730+
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
731+
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
732+
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
733+
Supplier<Boolean> shardLevelRefreshEnabled,
734+
RecoverySettings recoverySettings,
735+
RemoteStoreSettings remoteStoreSettings,
736+
Consumer<IndexShard> replicator,
737+
Function<ShardId, ReplicationStats> segmentReplicationStatsProvider,
738+
Supplier<Integer> clusterDefaultMaxMergeAtOnceSupplier,
739+
ClusterMergeSchedulerConfig clusterMergeSchedulerConfig
730740
) throws IOException {
731741
final IndexEventListener eventListener = freeze();
732742
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
@@ -798,7 +808,8 @@ public IndexService newIndexService(
798808
compositeIndexSettings,
799809
replicator,
800810
segmentReplicationStatsProvider,
801-
clusterDefaultMaxMergeAtOnceSupplier
811+
clusterDefaultMaxMergeAtOnceSupplier,
812+
clusterMergeSchedulerConfig
802813
);
803814
success = true;
804815
return indexService;

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
import org.opensearch.index.store.remote.filecache.FileCache;
100100
import org.opensearch.index.translog.Translog;
101101
import org.opensearch.index.translog.TranslogFactory;
102+
import org.opensearch.indices.ClusterMergeSchedulerConfig;
102103
import org.opensearch.indices.RemoteStoreSettings;
103104
import org.opensearch.indices.cluster.IndicesClusterStateService;
104105
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
@@ -252,7 +253,8 @@ public IndexService(
252253
CompositeIndexSettings compositeIndexSettings,
253254
Consumer<IndexShard> replicator,
254255
Function<ShardId, ReplicationStats> segmentReplicationStatsProvider,
255-
Supplier<Integer> clusterDefaultMaxMergeAtOnceSupplier
256+
Supplier<Integer> clusterDefaultMaxMergeAtOnceSupplier,
257+
ClusterMergeSchedulerConfig clusterMergeSchedulerConfig
256258
) {
257259
super(indexSettings);
258260
this.storeFactory = storeFactory;
@@ -352,6 +354,11 @@ public IndexService(
352354
this.replicator = replicator;
353355
this.segmentReplicationStatsProvider = segmentReplicationStatsProvider;
354356
indexSettings.setDefaultMaxMergesAtOnce(clusterDefaultMaxMergeAtOnceSupplier.get());
357+
indexSettings.setDefaultMaxThreadAndMergeCount(
358+
clusterMergeSchedulerConfig.getClusterMaxThreadCount(),
359+
clusterMergeSchedulerConfig.getClusterMaxMergeCount()
360+
);
361+
indexSettings.setDefaultAutoThrottleEnabled(clusterMergeSchedulerConfig.getClusterMergeAutoThrottleEnabled());
355362
updateFsyncTaskIfNecessary();
356363
synchronized (refreshMutex) {
357364
if (shardLevelRefreshEnabled == false) {
@@ -400,7 +407,8 @@ public IndexService(
400407
boolean shardLevelRefreshEnabled,
401408
RecoverySettings recoverySettings,
402409
RemoteStoreSettings remoteStoreSettings,
403-
Supplier<Integer> clusterDefaultMaxMergeAtOnce
410+
Supplier<Integer> clusterDefaultMaxMergeAtOnce,
411+
ClusterMergeSchedulerConfig clusterMergeSchedulerConfig
404412
) {
405413
this(
406414
indexSettings,
@@ -445,7 +453,8 @@ public IndexService(
445453
null,
446454
s -> {},
447455
(shardId) -> ReplicationStats.empty(),
448-
clusterDefaultMaxMergeAtOnce
456+
clusterDefaultMaxMergeAtOnce,
457+
clusterMergeSchedulerConfig
449458
);
450459
}
451460

@@ -1235,6 +1244,21 @@ public void onDefaultMaxMergeAtOnceChanged(int newDefaultMaxMergeAtOnce) {
12351244
indexSettings.setDefaultMaxMergesAtOnce(newDefaultMaxMergeAtOnce);
12361245
}
12371246

1247+
/**
1248+
* Called when the cluster level settings: {@code cluster.default.index.merge.scheduler.max_merge_count} OR
1249+
* {@code cluster.default.index.merge.scheduler.max_thread_count} change.
1250+
*/
1251+
public void onDefaultMaxMergeOrThreadCountUpdate(int maxThreadCount, int maxMergeCount) {
1252+
indexSettings.setDefaultMaxThreadAndMergeCount(maxThreadCount, maxMergeCount);
1253+
}
1254+
1255+
/**
1256+
* Called whenever the cluster level {@code cluster.default.index.merge.scheduler.auto_throttle} changes.
1257+
*/
1258+
public void onDefaultAutoThrottleEnabledUpdate(boolean enabled) {
1259+
indexSettings.setDefaultAutoThrottleEnabled(enabled);
1260+
}
1261+
12381262
/**
12391263
* Called whenever the cluster level {@code cluster.merge.scheduler.max_force_merge_mb_per_sec} changes.
12401264
* The change is only applied if the index doesn't have its own explicit force merge MB per sec setting.
@@ -1580,7 +1604,9 @@ final class AsyncPublishReferencedSegmentsTask extends BaseAsyncTask {
15801604

15811605
@Override
15821606
protected void runInternal() {
1583-
indexService.maybePublishReferencedSegments();
1607+
if (shouldRun()) {
1608+
indexService.maybePublishReferencedSegments();
1609+
}
15841610
}
15851611

15861612
@Override
@@ -1597,6 +1623,12 @@ public String toString() {
15971623
protected boolean mustReschedule() {
15981624
return indexSettings.isSegRepEnabledOrRemoteNode() && super.mustReschedule();
15991625
}
1626+
1627+
// visible for tests
1628+
protected boolean shouldRun() {
1629+
return (indexSettings.isSegRepLocalEnabled() || indexSettings.isRemoteStoreEnabled())
1630+
&& recoverySettings.isMergedSegmentReplicationWarmerEnabled();
1631+
}
16001632
}
16011633

16021634
private void maybePublishReferencedSegments() {

server/src/main/java/org/opensearch/index/IndexSettings.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1314,6 +1314,27 @@ void setDefaultMaxMergesAtOnce(int newDefaultMaxMergesAtOnce) {
13141314
}
13151315
}
13161316

1317+
/**
1318+
* Update the cached defaults for {@code MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING} and
1319+
* {@code MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING}
1320+
*/
1321+
void setDefaultMaxThreadAndMergeCount(int maxThreadCount, int maxMergeCount) {
1322+
// Upon updates to the cluster default settings, we always update the cached default values in
1323+
// the MergeSchedulerConfig, but we only update the actual values when an index level setting is not present
1324+
boolean overrideExistingConfigs = MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.exists(getSettings()) == false
1325+
&& MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.exists(getSettings()) == false;
1326+
mergeSchedulerConfig.setDefaultMaxThreadAndMergeCount(maxThreadCount, maxMergeCount, overrideExistingConfigs);
1327+
}
1328+
1329+
void setDefaultAutoThrottleEnabled(boolean enabled) {
1330+
// Upon updates to the cluster default settings, we always update the cached default values in
1331+
// the MergeSchedulerConfig, but we only update the actual values when an index level setting is not present
1332+
mergeSchedulerConfig.setDefaultAutoThrottleEnabled(
1333+
enabled,
1334+
MergeSchedulerConfig.AUTO_THROTTLE_SETTING.exists(getSettings()) == false
1335+
);
1336+
}
1337+
13171338
/**
13181339
* Updates the maxMergesAtOnce for actual TieredMergePolicy used by the engine.
13191340
* Sets it to default maxMergesAtOnce if index level settings is being removed

0 commit comments

Comments
 (0)