Skip to content

Commit 1fff3d2

Browse files
guojialiang92Peter Alfonsi
authored andcommitted
Avoid primary shard failure caused by merged segment warmer exceptions (opensearch-project#19436)
* Avoid primary shard failure caused by merge segment warmer exceptions Signed-off-by: guojialiang <[email protected]> * add test Signed-off-by: guojialiang <[email protected]> * update Signed-off-by: guojialiang <[email protected]> * update Signed-off-by: guojialiang <[email protected]> * update Signed-off-by: guojialiang <[email protected]> * add change log Signed-off-by: guojialiang <[email protected]> --------- Signed-off-by: guojialiang <[email protected]>
1 parent eda9015 commit 1fff3d2

File tree

7 files changed

+261
-30
lines changed

7 files changed

+261
-30
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6868

6969
### Fixed
7070
- Fix flaky test FieldDataLoadingIT.testIndicesFieldDataCacheSizeSetting ([#19571](https://github.com/opensearch-project/OpenSearch/pull/19571))
71+
- Avoid primary shard failure caused by merged segment warmer exceptions ([#19436](https://github.com/opensearch-project/OpenSearch/pull/19436))
7172

7273
### Dependencies
7374
- Bump `peter-evans/create-or-update-comment` from 4 to 5 ([#19536](https://github.com/opensearch-project/OpenSearch/pull/19536))

server/src/internalClusterTest/java/org/opensearch/indices/replication/MergedSegmentWarmerIT.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
1515
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
1616
import org.opensearch.action.admin.indices.segments.ShardSegments;
17+
import org.opensearch.action.index.IndexRequestBuilder;
1718
import org.opensearch.action.support.WriteRequest;
1819
import org.opensearch.common.settings.Settings;
1920
import org.opensearch.common.unit.TimeValue;
@@ -38,6 +39,7 @@
3839
import java.util.stream.Collectors;
3940

4041
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
42+
import static org.hamcrest.Matchers.equalTo;
4143

4244
/**
4345
* This class runs Segment Replication Integ test suite with merged segment warmer enabled.
@@ -59,6 +61,33 @@ protected Settings featureFlagSettings() {
5961
return featureSettings.build();
6062
}
6163

64+
public void testPrimaryNodeRestart() throws Exception {
65+
logger.info("--> start nodes");
66+
internalCluster().startNode();
67+
68+
logger.info("--> creating test index: {}", INDEX_NAME);
69+
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put("number_of_shards", 1).put("number_of_replicas", 0).build());
70+
71+
ensureGreen();
72+
73+
logger.info("--> indexing sample data");
74+
final int numDocs = 100;
75+
final IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs];
76+
77+
for (int i = 0; i < numDocs; i++) {
78+
docs[i] = client().prepareIndex(INDEX_NAME)
79+
.setSource("foo-int", randomInt(), "foo-string", randomAlphaOfLength(32), "foo-float", randomFloat());
80+
}
81+
82+
indexRandom(true, docs);
83+
flush();
84+
assertThat(client().prepareSearch(INDEX_NAME).setSize(0).get().getHits().getTotalHits().value(), equalTo((long) numDocs));
85+
86+
logger.info("--> restarting cluster");
87+
internalCluster().fullRestart();
88+
ensureGreen();
89+
}
90+
6291
public void testMergeSegmentWarmer() throws Exception {
6392
final String primaryNode = internalCluster().startDataOnlyNode();
6493
final String replicaNode = internalCluster().startDataOnlyNode();

server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmer.java

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -51,29 +51,39 @@ public MergedSegmentWarmer(
5151

5252
@Override
5353
public void warm(LeafReader leafReader) throws IOException {
54-
if (shouldWarm() == false) {
55-
return;
56-
}
57-
// IndexWriter.IndexReaderWarmer#warm is called by IndexWriter#mergeMiddle. The type of leafReader should be SegmentReader.
58-
assert leafReader instanceof SegmentReader;
59-
assert indexShard.indexSettings().isSegRepLocalEnabled() || indexShard.indexSettings().isRemoteStoreEnabled();
54+
try {
55+
if (shouldWarm() == false) {
56+
return;
57+
}
58+
// IndexWriter.IndexReaderWarmer#warm is called by IndexWriter#mergeMiddle. The type of leafReader should be SegmentReader.
59+
assert leafReader instanceof SegmentReader;
60+
assert indexShard.indexSettings().isSegRepLocalEnabled() || indexShard.indexSettings().isRemoteStoreEnabled();
6061

61-
long startTime = System.currentTimeMillis();
62-
SegmentCommitInfo segmentCommitInfo = ((SegmentReader) leafReader).getSegmentInfo();
63-
logger.trace(() -> new ParameterizedMessage("Warming segment: {}", segmentCommitInfo));
64-
indexShard.publishMergedSegment(segmentCommitInfo);
65-
logger.trace(() -> {
66-
long segmentSize = -1;
67-
try {
68-
segmentSize = segmentCommitInfo.sizeInBytes();
69-
} catch (IOException ignored) {}
70-
return new ParameterizedMessage(
71-
"Completed segment warming for {}. Size: {}B, Timing: {}ms",
72-
segmentCommitInfo.info.name,
73-
segmentSize,
74-
(System.currentTimeMillis() - startTime)
62+
long startTime = System.currentTimeMillis();
63+
SegmentCommitInfo segmentCommitInfo = ((SegmentReader) leafReader).getSegmentInfo();
64+
logger.info(() -> new ParameterizedMessage("Warming segment: {}", segmentCommitInfo));
65+
indexShard.publishMergedSegment(segmentCommitInfo);
66+
logger.trace(() -> {
67+
long segmentSize = -1;
68+
try {
69+
segmentSize = segmentCommitInfo.sizeInBytes();
70+
} catch (IOException ignored) {}
71+
return new ParameterizedMessage(
72+
"Completed segment warming for {}. Size: {}B, Timing: {}ms",
73+
segmentCommitInfo.info.name,
74+
segmentSize,
75+
(System.currentTimeMillis() - startTime)
76+
);
77+
});
78+
} catch (Exception e) {
79+
logger.warn(
80+
() -> new ParameterizedMessage(
81+
"Throw exception during merged segment warmer, skip merged segment {} warmer",
82+
((SegmentReader) leafReader).getSegmentName()
83+
),
84+
e
7585
);
76-
});
86+
}
7787
}
7888

7989
// package-private for tests

server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -637,6 +637,14 @@ public void testMergedSegmentReplication() throws Exception {
637637
super.testMergedSegmentReplication();
638638
}
639639

640+
@LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG)
641+
@Override
642+
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/pull/19436")
643+
public void testMergedSegmentReplicationWithException() throws Exception {
644+
// TODO: wait for remote store to support merged segment warmer
645+
super.testMergedSegmentReplicationWithException();
646+
}
647+
640648
@LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG)
641649
@Override
642650
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/pull/18255")

server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java

Lines changed: 91 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.opensearch.indices.replication.SegmentReplicationState;
6161
import org.opensearch.indices.replication.SegmentReplicationTarget;
6262
import org.opensearch.indices.replication.SegmentReplicationTargetService;
63+
import org.opensearch.indices.replication.checkpoint.MergedSegmentPublisher;
6364
import org.opensearch.indices.replication.checkpoint.ReferencedSegmentsCheckpoint;
6465
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
6566
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
@@ -75,6 +76,7 @@
7576
import org.opensearch.snapshots.SnapshotInfoTests;
7677
import org.opensearch.snapshots.SnapshotShardsService;
7778
import org.opensearch.test.VersionUtils;
79+
import org.opensearch.test.junit.annotations.TestLogging;
7880
import org.opensearch.threadpool.ThreadPool;
7981
import org.opensearch.transport.TransportService;
8082
import org.junit.Assert;
@@ -156,9 +158,69 @@ public void testReplication() throws Exception {
156158
}
157159

158160
@LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG)
161+
@TestLogging(reason = "Getting trace logs from MergedSegmentWarmer", value = "org.opensearch.index.engine.MergedSegmentWarmer:TRACE")
159162
public void testMergedSegmentReplication() throws Exception {
160163
// Test that the pre-copy merged segment logic does not block the merge process of the primary shard when there are 1 replica shard.
161-
try (ReplicationGroup shards = createGroup(1, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory());) {
164+
final RecoverySettings recoverySettings = new RecoverySettings(
165+
Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true).build(),
166+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
167+
);
168+
try (
169+
ReplicationGroup shards = createGroup(
170+
1,
171+
getIndexSettings(),
172+
indexMapping,
173+
new NRTReplicationEngineFactory(),
174+
recoverySettings,
175+
MergedSegmentPublisher.EMPTY
176+
)
177+
) {
178+
shards.startAll();
179+
final IndexShard primaryShard = shards.getPrimary();
180+
final IndexShard replicaShard = shards.getReplicas().get(0);
181+
182+
// index and replicate segments to replica.
183+
int numDocs = randomIntBetween(10, 20);
184+
shards.indexDocs(numDocs);
185+
primaryShard.refresh("test");
186+
flushShard(primaryShard);
187+
188+
shards.indexDocs(numDocs);
189+
primaryShard.refresh("test");
190+
flushShard(primaryShard);
191+
replicateSegments(primaryShard, List.of(replicaShard));
192+
shards.assertAllEqual(2 * numDocs);
193+
194+
primaryShard.forceMerge(new ForceMergeRequest("test").maxNumSegments(1));
195+
replicateMergedSegments(primaryShard, List.of(replicaShard));
196+
primaryShard.refresh("test");
197+
assertEquals(1, primaryShard.segments(false).size());
198+
// After the pre-copy merged segment is completed, the merged segment is not visible in the replica, and the number of segments
199+
// in the replica shard is still 2.
200+
assertEquals(2, replicaShard.segments(false).size());
201+
}
202+
}
203+
204+
@LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG)
205+
public void testMergedSegmentReplicationWithException() throws Exception {
206+
// Test that the pre-copy merged segment exception will not cause primary shard to fail
207+
MergedSegmentPublisher mergedSegmentPublisherWithException = new MergedSegmentPublisher((indexShard, checkpoint) -> {
208+
throw new RuntimeException("mock exception");
209+
});
210+
final RecoverySettings recoverySettings = new RecoverySettings(
211+
Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true).build(),
212+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
213+
);
214+
try (
215+
ReplicationGroup shards = createGroup(
216+
1,
217+
getIndexSettings(),
218+
indexMapping,
219+
new NRTReplicationEngineFactory(),
220+
recoverySettings,
221+
mergedSegmentPublisherWithException
222+
)
223+
) {
162224
shards.startAll();
163225
final IndexShard primaryShard = shards.getPrimary();
164226
final IndexShard replicaShard = shards.getReplicas().get(0);
@@ -188,7 +250,20 @@ public void testMergedSegmentReplication() throws Exception {
188250
@LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG)
189251
public void testMergedSegmentReplicationWithZeroReplica() throws Exception {
190252
// Test that the pre-copy merged segment logic does not block the merge process of the primary shard when there are 0 replica shard.
191-
try (ReplicationGroup shards = createGroup(0, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory());) {
253+
final RecoverySettings recoverySettings = new RecoverySettings(
254+
Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true).build(),
255+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
256+
);
257+
try (
258+
ReplicationGroup shards = createGroup(
259+
0,
260+
getIndexSettings(),
261+
indexMapping,
262+
new NRTReplicationEngineFactory(),
263+
recoverySettings,
264+
MergedSegmentPublisher.EMPTY
265+
)
266+
) {
192267
shards.startAll();
193268
final IndexShard primaryShard = shards.getPrimary();
194269

@@ -210,7 +285,20 @@ public void testMergedSegmentReplicationWithZeroReplica() throws Exception {
210285

211286
@LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG)
212287
public void testCleanupRedundantPendingMergeSegment() throws Exception {
213-
try (ReplicationGroup shards = createGroup(1, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory());) {
288+
final RecoverySettings recoverySettings = new RecoverySettings(
289+
Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true).build(),
290+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
291+
);
292+
try (
293+
ReplicationGroup shards = createGroup(
294+
1,
295+
getIndexSettings(),
296+
indexMapping,
297+
new NRTReplicationEngineFactory(),
298+
recoverySettings,
299+
MergedSegmentPublisher.EMPTY
300+
)
301+
) {
214302
shards.startAll();
215303
final IndexShard primaryShard = shards.getPrimary();
216304
final IndexShard replicaShard = shards.getReplicas().get(0);

test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,11 @@
101101
import org.opensearch.index.shard.PrimaryReplicaSyncer;
102102
import org.opensearch.index.shard.ShardPath;
103103
import org.opensearch.index.translog.Translog;
104+
import org.opensearch.indices.recovery.DefaultRecoverySettings;
105+
import org.opensearch.indices.recovery.RecoverySettings;
104106
import org.opensearch.indices.recovery.RecoveryState;
105107
import org.opensearch.indices.recovery.RecoveryTarget;
108+
import org.opensearch.indices.replication.checkpoint.MergedSegmentPublisher;
106109
import org.opensearch.tasks.TaskManager;
107110
import org.opensearch.threadpool.ThreadPool;
108111
import org.opensearch.threadpool.ThreadPool.Names;
@@ -150,17 +153,48 @@ protected ReplicationGroup createGroup(int replicas, Settings settings, EngineFa
150153

151154
protected ReplicationGroup createGroup(int replicas, Settings settings, String mappings, EngineFactory engineFactory)
152155
throws IOException {
156+
return createGroup(replicas, settings, mappings, engineFactory, DefaultRecoverySettings.INSTANCE, MergedSegmentPublisher.EMPTY);
157+
}
158+
159+
protected ReplicationGroup createGroup(
160+
int replicas,
161+
Settings settings,
162+
String mappings,
163+
EngineFactory engineFactory,
164+
RecoverySettings recoverySettings,
165+
MergedSegmentPublisher mergedSegmentPublisher
166+
) throws IOException {
153167
Path remotePath = null;
154168
if ("true".equals(settings.get(IndexMetadata.SETTING_REMOTE_STORE_ENABLED))) {
155169
remotePath = createTempDir();
156170
}
157-
return createGroup(replicas, settings, mappings, engineFactory, remotePath);
171+
return createGroup(replicas, settings, mappings, engineFactory, recoverySettings, remotePath, mergedSegmentPublisher);
158172
}
159173

160174
protected ReplicationGroup createGroup(int replicas, Settings settings, String mappings, EngineFactory engineFactory, Path remotePath)
161175
throws IOException {
176+
return createGroup(
177+
replicas,
178+
settings,
179+
mappings,
180+
engineFactory,
181+
DefaultRecoverySettings.INSTANCE,
182+
remotePath,
183+
MergedSegmentPublisher.EMPTY
184+
);
185+
}
186+
187+
protected ReplicationGroup createGroup(
188+
int replicas,
189+
Settings settings,
190+
String mappings,
191+
EngineFactory engineFactory,
192+
RecoverySettings recoverySettings,
193+
Path remotePath,
194+
MergedSegmentPublisher mergedSegmentPublisher
195+
) throws IOException {
162196
IndexMetadata metadata = buildIndexMetadata(replicas, settings, mappings);
163-
return new ReplicationGroup(metadata, remotePath) {
197+
return new ReplicationGroup(metadata, recoverySettings, remotePath, mergedSegmentPublisher) {
164198
@Override
165199
protected EngineFactory getEngineFactory(ShardRouting routing) {
166200
return engineFactory;
@@ -253,6 +287,15 @@ protected ReplicationGroup(final IndexMetadata indexMetadata) throws IOException
253287
}
254288

255289
protected ReplicationGroup(final IndexMetadata indexMetadata, Path remotePath) throws IOException {
290+
this(indexMetadata, DefaultRecoverySettings.INSTANCE, remotePath, MergedSegmentPublisher.EMPTY);
291+
}
292+
293+
protected ReplicationGroup(
294+
final IndexMetadata indexMetadata,
295+
RecoverySettings recoverySettings,
296+
Path remotePath,
297+
MergedSegmentPublisher mergedSegmentPublisher
298+
) throws IOException {
256299
final ShardRouting primaryRouting = this.createShardRouting("s0", true);
257300
primary = newShard(
258301
primaryRouting,
@@ -261,7 +304,9 @@ protected ReplicationGroup(final IndexMetadata indexMetadata, Path remotePath) t
261304
getEngineFactory(primaryRouting),
262305
() -> {},
263306
retentionLeaseSyncer,
264-
remotePath
307+
recoverySettings,
308+
remotePath,
309+
mergedSegmentPublisher
265310
);
266311
replicas = new CopyOnWriteArrayList<>();
267312
this.indexMetadata = indexMetadata;

0 commit comments

Comments
 (0)