Skip to content

Commit 112ce0d

Browse files
committed
Resolved conflicts
Signed-off-by: Sriram Ganesh <[email protected]>
1 parent 022d594 commit 112ce0d

File tree

11 files changed

+1405
-0
lines changed

11 files changed

+1405
-0
lines changed

CHANGELOG.md

Lines changed: 163 additions & 0 deletions
Large diffs are not rendered by default.

server/src/main/java/org/opensearch/action/ActionModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,7 @@
345345
import org.opensearch.rest.RestHeaderDefinition;
346346
import org.opensearch.rest.action.RestFieldCapabilitiesAction;
347347
import org.opensearch.rest.action.RestMainAction;
348+
import org.opensearch.rest.action.admin.RestSegmentTopologyAction;
348349
import org.opensearch.rest.action.admin.cluster.RestAddVotingConfigExclusionAction;
349350
import org.opensearch.rest.action.admin.cluster.RestCancelTasksAction;
350351
import org.opensearch.rest.action.admin.cluster.RestCleanupRepositoryAction;
@@ -1013,6 +1014,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
10131014
registerHandler.accept(new RestTasksAction(nodesInCluster));
10141015
registerHandler.accept(new RestIndicesAction(responseLimitSettings));
10151016
registerHandler.accept(new RestSegmentsAction(responseLimitSettings));
1017+
registerHandler.accept(new RestSegmentTopologyAction());
10161018
// Fully qualified to prevent interference with rest.action.count.RestCountAction
10171019
registerHandler.accept(new org.opensearch.rest.action.cat.RestCountAction());
10181020
// Fully qualified to prevent interference with rest.action.indices.RestRecoveryAction
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index;
10+
11+
import org.apache.logging.log4j.Logger;
12+
import org.apache.lucene.index.TieredMergePolicy;
13+
import org.opensearch.common.annotation.PublicApi;
14+
import org.opensearch.core.common.unit.ByteSizeUnit;
15+
import org.opensearch.core.common.unit.ByteSizeValue;
16+
import org.opensearch.index.store.Store;
17+
18+
/**
19+
* An adaptive merge policy provider that adjusts merge settings based on shard size
20+
* to optimize segment topology and reduce benchmark variance.
21+
*
22+
* This addresses the issue described in https://github.com/opensearch-project/OpenSearch/issues/11163
23+
* by providing more intelligent default merge settings that adapt to the actual shard size.
24+
*
25+
* @opensearch.api
26+
*/
27+
@PublicApi(since = "3.3.0")
28+
public class AdaptiveTieredMergePolicyProvider implements MergePolicyProvider {
29+
30+
private final Logger logger;
31+
private final OpenSearchTieredMergePolicy tieredMergePolicy;
32+
private Store store;
33+
private boolean mergesEnabled;
34+
35+
// Adaptive settings based on shard size
36+
private static final ByteSizeValue SMALL_SHARD_THRESHOLD = new ByteSizeValue(100, ByteSizeUnit.MB);
37+
private static final ByteSizeValue MEDIUM_SHARD_THRESHOLD = new ByteSizeValue(1, ByteSizeUnit.GB);
38+
private static final ByteSizeValue LARGE_SHARD_THRESHOLD = new ByteSizeValue(10, ByteSizeUnit.GB);
39+
40+
// Adaptive max segment sizes
41+
private static final ByteSizeValue SMALL_SHARD_MAX_SEGMENT = new ByteSizeValue(50, ByteSizeUnit.MB);
42+
private static final ByteSizeValue MEDIUM_SHARD_MAX_SEGMENT = new ByteSizeValue(200, ByteSizeUnit.MB);
43+
private static final ByteSizeValue LARGE_SHARD_MAX_SEGMENT = new ByteSizeValue(1, ByteSizeUnit.GB);
44+
private static final ByteSizeValue VERY_LARGE_SHARD_MAX_SEGMENT = new ByteSizeValue(2, ByteSizeUnit.GB);
45+
46+
// Adaptive floor segment sizes
47+
private static final ByteSizeValue SMALL_SHARD_FLOOR = new ByteSizeValue(10, ByteSizeUnit.MB);
48+
private static final ByteSizeValue MEDIUM_SHARD_FLOOR = new ByteSizeValue(25, ByteSizeUnit.MB);
49+
private static final ByteSizeValue LARGE_SHARD_FLOOR = new ByteSizeValue(50, ByteSizeUnit.MB);
50+
private static final ByteSizeValue VERY_LARGE_SHARD_FLOOR = new ByteSizeValue(100, ByteSizeUnit.MB);
51+
52+
// Adaptive segments per tier
53+
private static final double SMALL_SHARD_SEGMENTS_PER_TIER = 5.0;
54+
private static final double MEDIUM_SHARD_SEGMENTS_PER_TIER = 8.0;
55+
private static final double LARGE_SHARD_SEGMENTS_PER_TIER = 10.0;
56+
private static final double VERY_LARGE_SHARD_SEGMENTS_PER_TIER = 12.0;
57+
58+
public AdaptiveTieredMergePolicyProvider(Logger logger, IndexSettings indexSettings) {
59+
this.logger = logger;
60+
this.store = null; // Will be set later via setStore()
61+
this.tieredMergePolicy = new OpenSearchTieredMergePolicy();
62+
this.mergesEnabled = indexSettings.getSettings().getAsBoolean("index.merge.enabled", true);
63+
64+
if (mergesEnabled == false) {
65+
logger.warn(
66+
"[index.merge.enabled] is set to false, this should only be used in tests and can cause serious problems in production environments"
67+
);
68+
}
69+
70+
// Initialize with default settings first, will be updated when store is available
71+
applyDefaultSettings();
72+
}
73+
74+
public AdaptiveTieredMergePolicyProvider(Logger logger, IndexSettings indexSettings, Store store) {
75+
this.logger = logger;
76+
this.store = store;
77+
this.tieredMergePolicy = new OpenSearchTieredMergePolicy();
78+
this.mergesEnabled = indexSettings.getSettings().getAsBoolean("index.merge.enabled", true);
79+
80+
if (mergesEnabled == false) {
81+
logger.warn(
82+
"[index.merge.enabled] is set to false, this should only be used in tests and can cause serious problems in production environments"
83+
);
84+
}
85+
86+
// Initialize with adaptive settings
87+
initializeAdaptiveSettings();
88+
}
89+
90+
private void initializeAdaptiveSettings() {
91+
try {
92+
// Estimate shard size from store
93+
long estimatedShardSize = estimateShardSize();
94+
ShardSizeCategory category = categorizeShardSize(estimatedShardSize);
95+
96+
// Apply adaptive settings based on shard size category
97+
applyAdaptiveSettings(category);
98+
99+
logger.debug(
100+
"Initialized adaptive merge policy for shard size category: {} (estimated size: {})",
101+
category,
102+
new ByteSizeValue(estimatedShardSize)
103+
);
104+
105+
} catch (Exception e) {
106+
logger.warn("Failed to initialize adaptive settings, falling back to defaults: {}", e.getMessage());
107+
applyDefaultSettings();
108+
}
109+
}
110+
111+
private long estimateShardSize() {
112+
if (store == null) {
113+
// Fallback to a reasonable default when store is not available
114+
return MEDIUM_SHARD_THRESHOLD.getBytes();
115+
}
116+
try {
117+
// Try to get a rough estimate of shard size from the store
118+
// This is a best-effort estimation - using directory size as proxy
119+
return store.directory().listAll().length * 1024 * 1024; // Rough estimate
120+
} catch (Exception e) {
121+
// Fallback to a reasonable default
122+
return MEDIUM_SHARD_THRESHOLD.getBytes();
123+
}
124+
}
125+
126+
private ShardSizeCategory categorizeShardSize(long sizeBytes) {
127+
if (sizeBytes < SMALL_SHARD_THRESHOLD.getBytes()) {
128+
return ShardSizeCategory.SMALL;
129+
} else if (sizeBytes < MEDIUM_SHARD_THRESHOLD.getBytes()) {
130+
return ShardSizeCategory.MEDIUM;
131+
} else if (sizeBytes < LARGE_SHARD_THRESHOLD.getBytes()) {
132+
return ShardSizeCategory.LARGE;
133+
} else {
134+
return ShardSizeCategory.VERY_LARGE;
135+
}
136+
}
137+
138+
private void applyAdaptiveSettings(ShardSizeCategory category) {
139+
ByteSizeValue maxSegmentSize;
140+
ByteSizeValue floorSegmentSize;
141+
double segmentsPerTier;
142+
143+
switch (category) {
144+
case SMALL:
145+
maxSegmentSize = SMALL_SHARD_MAX_SEGMENT;
146+
floorSegmentSize = SMALL_SHARD_FLOOR;
147+
segmentsPerTier = SMALL_SHARD_SEGMENTS_PER_TIER;
148+
break;
149+
case MEDIUM:
150+
maxSegmentSize = MEDIUM_SHARD_MAX_SEGMENT;
151+
floorSegmentSize = MEDIUM_SHARD_FLOOR;
152+
segmentsPerTier = MEDIUM_SHARD_SEGMENTS_PER_TIER;
153+
break;
154+
case LARGE:
155+
maxSegmentSize = LARGE_SHARD_MAX_SEGMENT;
156+
floorSegmentSize = LARGE_SHARD_FLOOR;
157+
segmentsPerTier = LARGE_SHARD_SEGMENTS_PER_TIER;
158+
break;
159+
case VERY_LARGE:
160+
maxSegmentSize = VERY_LARGE_SHARD_MAX_SEGMENT;
161+
floorSegmentSize = VERY_LARGE_SHARD_FLOOR;
162+
segmentsPerTier = VERY_LARGE_SHARD_SEGMENTS_PER_TIER;
163+
break;
164+
default:
165+
maxSegmentSize = MEDIUM_SHARD_MAX_SEGMENT;
166+
floorSegmentSize = MEDIUM_SHARD_FLOOR;
167+
segmentsPerTier = MEDIUM_SHARD_SEGMENTS_PER_TIER;
168+
}
169+
170+
// Apply the adaptive settings
171+
tieredMergePolicy.setMaxMergedSegmentMB(maxSegmentSize.getMbFrac());
172+
tieredMergePolicy.setFloorSegmentMB(floorSegmentSize.getMbFrac());
173+
tieredMergePolicy.setSegmentsPerTier(segmentsPerTier);
174+
175+
// Keep other settings at reasonable defaults
176+
tieredMergePolicy.setMaxMergeAtOnce(10);
177+
tieredMergePolicy.setForceMergeDeletesPctAllowed(10.0);
178+
tieredMergePolicy.setDeletesPctAllowed(20.0);
179+
tieredMergePolicy.setNoCFSRatio(TieredMergePolicy.DEFAULT_NO_CFS_RATIO);
180+
181+
logger.info(
182+
"Applied adaptive merge settings - max_segment: {}, floor_segment: {}, segments_per_tier: {}",
183+
maxSegmentSize,
184+
floorSegmentSize,
185+
segmentsPerTier
186+
);
187+
}
188+
189+
private void applyDefaultSettings() {
190+
// Fallback to the original default settings
191+
tieredMergePolicy.setMaxMergedSegmentMB(5 * 1024); // 5GB
192+
tieredMergePolicy.setFloorSegmentMB(16); // 16MB
193+
tieredMergePolicy.setSegmentsPerTier(10.0);
194+
tieredMergePolicy.setMaxMergeAtOnce(10);
195+
tieredMergePolicy.setForceMergeDeletesPctAllowed(10.0);
196+
tieredMergePolicy.setDeletesPctAllowed(20.0);
197+
tieredMergePolicy.setNoCFSRatio(TieredMergePolicy.DEFAULT_NO_CFS_RATIO);
198+
}
199+
200+
/**
201+
* Sets the store instance and reinitializes adaptive settings
202+
*/
203+
public void setStore(Store store) {
204+
this.store = store;
205+
if (store != null) {
206+
initializeAdaptiveSettings();
207+
}
208+
}
209+
210+
/**
211+
* Updates merge settings based on runtime analysis of segment topology
212+
*/
213+
public void updateSettingsBasedOnAnalysis(
214+
org.opensearch.index.analysis.SegmentTopologyAnalyzer.MergePolicyRecommendations recommendations
215+
) {
216+
if (recommendations.hasVarianceIssue || recommendations.hasSkewIssue) {
217+
logger.info("Updating merge settings based on segment topology analysis");
218+
219+
// Apply recommended settings
220+
tieredMergePolicy.setMaxMergedSegmentMB(recommendations.recommendedMaxSegmentSize / (1024 * 1024));
221+
tieredMergePolicy.setFloorSegmentMB(recommendations.recommendedFloorSegmentSize / (1024 * 1024));
222+
223+
// Adjust segments per tier based on optimal count
224+
double newSegmentsPerTier = Math.max(5.0, Math.min(20.0, recommendations.optimalSegmentCount * 0.8));
225+
tieredMergePolicy.setSegmentsPerTier(newSegmentsPerTier);
226+
227+
logger.info(
228+
"Updated merge settings - max_segment: {}MB, floor_segment: {}MB, segments_per_tier: {}",
229+
recommendations.recommendedMaxSegmentSize / (1024 * 1024),
230+
recommendations.recommendedFloorSegmentSize / (1024 * 1024),
231+
newSegmentsPerTier
232+
);
233+
}
234+
}
235+
236+
@Override
237+
public org.apache.lucene.index.MergePolicy getMergePolicy() {
238+
return mergesEnabled ? tieredMergePolicy : org.apache.lucene.index.NoMergePolicy.INSTANCE;
239+
}
240+
241+
private enum ShardSizeCategory {
242+
SMALL,
243+
MEDIUM,
244+
LARGE,
245+
VERY_LARGE
246+
}
247+
}

server/src/main/java/org/opensearch/index/IndexSettings.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ public final class IndexSettings {
104104
public enum IndexMergePolicy {
105105
TIERED("tiered"),
106106
LOG_BYTE_SIZE("log_byte_size"),
107+
ADAPTIVE("adaptive"),
107108
DEFAULT_POLICY(IndexSettings.DEFAULT_POLICY);
108109

109110
private final String value;
@@ -904,6 +905,7 @@ public static IndexMergePolicy fromString(String text) {
904905
private final MergeSchedulerConfig mergeSchedulerConfig;
905906
private final TieredMergePolicyProvider tieredMergePolicyProvider;
906907
private final LogByteSizeMergePolicyProvider logByteSizeMergePolicyProvider;
908+
private final AdaptiveTieredMergePolicyProvider adaptiveTieredMergePolicyProvider;
907909
private final IndexSortConfig indexSortConfig;
908910
private final IndexScopedSettings scopedSettings;
909911
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
@@ -1147,6 +1149,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
11471149
maxRegexLength = scopedSettings.get(MAX_REGEX_LENGTH_SETTING);
11481150
this.tieredMergePolicyProvider = new TieredMergePolicyProvider(logger, this);
11491151
this.logByteSizeMergePolicyProvider = new LogByteSizeMergePolicyProvider(logger, this);
1152+
this.adaptiveTieredMergePolicyProvider = new AdaptiveTieredMergePolicyProvider(logger, this);
11501153
this.indexSortConfig = new IndexSortConfig(this);
11511154
searchIdleAfter = scopedSettings.get(INDEX_SEARCH_IDLE_AFTER);
11521155
defaultPipeline = scopedSettings.get(DEFAULT_PIPELINE);
@@ -1956,6 +1959,9 @@ public MergePolicy getMergePolicy(boolean isTimeSeriesIndex) {
19561959
case LOG_BYTE_SIZE:
19571960
mergePolicyProvider = logByteSizeMergePolicyProvider;
19581961
break;
1962+
case ADAPTIVE:
1963+
mergePolicyProvider = adaptiveTieredMergePolicyProvider;
1964+
break;
19591965
case DEFAULT_POLICY:
19601966
if (isTimeSeriesIndex) {
19611967
String nodeScopedTimeSeriesIndexPolicy = TIME_SERIES_INDEX_MERGE_POLICY.get(nodeSettings);
@@ -1968,6 +1974,9 @@ public MergePolicy getMergePolicy(boolean isTimeSeriesIndex) {
19681974
case LOG_BYTE_SIZE:
19691975
mergePolicyProvider = logByteSizeMergePolicyProvider;
19701976
break;
1977+
case ADAPTIVE:
1978+
mergePolicyProvider = adaptiveTieredMergePolicyProvider;
1979+
break;
19711980
}
19721981
} else {
19731982
mergePolicyProvider = tieredMergePolicyProvider;
@@ -2280,4 +2289,11 @@ public boolean isDerivedSourceEnabledForTranslog() {
22802289
public boolean isDerivedSourceEnabled() {
22812290
return derivedSourceEnabled;
22822291
}
2292+
2293+
/**
2294+
* Returns the adaptive tiered merge policy provider
2295+
*/
2296+
public AdaptiveTieredMergePolicyProvider getAdaptiveTieredMergePolicyProvider() {
2297+
return adaptiveTieredMergePolicyProvider;
2298+
}
22832299
}

0 commit comments

Comments
 (0)