diff --git a/CHANGELOG.md b/CHANGELOG.md index 249ce03e2e73c..9ad95e6ebcd5b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Fixed - Fix flaky test FieldDataLoadingIT.testIndicesFieldDataCacheSizeSetting ([#19571](https://github.com/opensearch-project/OpenSearch/pull/19571)) +- Avoid primary shard failure caused by merged segment warmer exceptions ([#19436](https://github.com/opensearch-project/OpenSearch/pull/19436)) ### Dependencies - Bump `com.azure:azure-core-http-netty` from 1.15.12 to 1.16.1 ([#19533](https://github.com/opensearch-project/OpenSearch/pull/19533)) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/MergedSegmentWarmerIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/MergedSegmentWarmerIT.java index 5ee8b1aa738fa..9d58f4fe5d136 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/MergedSegmentWarmerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/MergedSegmentWarmerIT.java @@ -14,6 +14,7 @@ import org.opensearch.action.admin.indices.segments.IndexShardSegments; import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; import org.opensearch.action.admin.indices.segments.ShardSegments; +import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.support.WriteRequest; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -38,6 +39,7 @@ import java.util.stream.Collectors; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; /** * This class runs Segment Replication Integ test suite with merged segment warmer enabled. @@ -59,6 +61,33 @@ protected Settings featureFlagSettings() { return featureSettings.build(); } + public void testPrimaryNodeRestart() throws Exception { + logger.info("--> start nodes"); + internalCluster().startNode(); + + logger.info("--> creating test index: {}", INDEX_NAME); + createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put("number_of_shards", 1).put("number_of_replicas", 0).build()); + + ensureGreen(); + + logger.info("--> indexing sample data"); + final int numDocs = 100; + final IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs]; + + for (int i = 0; i < numDocs; i++) { + docs[i] = client().prepareIndex(INDEX_NAME) + .setSource("foo-int", randomInt(), "foo-string", randomAlphaOfLength(32), "foo-float", randomFloat()); + } + + indexRandom(true, docs); + flush(); + assertThat(client().prepareSearch(INDEX_NAME).setSize(0).get().getHits().getTotalHits().value(), equalTo((long) numDocs)); + + logger.info("--> restarting cluster"); + internalCluster().fullRestart(); + ensureGreen(); + } + public void testMergeSegmentWarmer() throws Exception { final String primaryNode = internalCluster().startDataOnlyNode(); final String replicaNode = internalCluster().startDataOnlyNode(); diff --git a/server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmer.java b/server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmer.java index 29cbcdd04484e..b3bec459eb1aa 100644 --- a/server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmer.java +++ b/server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmer.java @@ -51,29 +51,39 @@ public MergedSegmentWarmer( @Override public void warm(LeafReader leafReader) throws IOException { - if (shouldWarm() == false) { - return; - } - // IndexWriter.IndexReaderWarmer#warm is called by IndexWriter#mergeMiddle. The type of leafReader should be SegmentReader. - assert leafReader instanceof SegmentReader; - assert indexShard.indexSettings().isSegRepLocalEnabled() || indexShard.indexSettings().isRemoteStoreEnabled(); + try { + if (shouldWarm() == false) { + return; + } + // IndexWriter.IndexReaderWarmer#warm is called by IndexWriter#mergeMiddle. The type of leafReader should be SegmentReader. + assert leafReader instanceof SegmentReader; + assert indexShard.indexSettings().isSegRepLocalEnabled() || indexShard.indexSettings().isRemoteStoreEnabled(); - long startTime = System.currentTimeMillis(); - SegmentCommitInfo segmentCommitInfo = ((SegmentReader) leafReader).getSegmentInfo(); - logger.trace(() -> new ParameterizedMessage("Warming segment: {}", segmentCommitInfo)); - indexShard.publishMergedSegment(segmentCommitInfo); - logger.trace(() -> { - long segmentSize = -1; - try { - segmentSize = segmentCommitInfo.sizeInBytes(); - } catch (IOException ignored) {} - return new ParameterizedMessage( - "Completed segment warming for {}. Size: {}B, Timing: {}ms", - segmentCommitInfo.info.name, - segmentSize, - (System.currentTimeMillis() - startTime) + long startTime = System.currentTimeMillis(); + SegmentCommitInfo segmentCommitInfo = ((SegmentReader) leafReader).getSegmentInfo(); + logger.info(() -> new ParameterizedMessage("Warming segment: {}", segmentCommitInfo)); + indexShard.publishMergedSegment(segmentCommitInfo); + logger.trace(() -> { + long segmentSize = -1; + try { + segmentSize = segmentCommitInfo.sizeInBytes(); + } catch (IOException ignored) {} + return new ParameterizedMessage( + "Completed segment warming for {}. Size: {}B, Timing: {}ms", + segmentCommitInfo.info.name, + segmentSize, + (System.currentTimeMillis() - startTime) + ); + }); + } catch (Exception e) { + logger.warn( + () -> new ParameterizedMessage( + "Throw exception during merged segment warmer, skip merged segment {} warmer", + ((SegmentReader) leafReader).getSegmentName() + ), + e ); - }); + } } // package-private for tests diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index 43eda929b7aa4..5361d878b618e 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -637,6 +637,14 @@ public void testMergedSegmentReplication() throws Exception { super.testMergedSegmentReplication(); } + @LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG) + @Override + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/pull/19436") + public void testMergedSegmentReplicationWithException() throws Exception { + // TODO: wait for remote store to support merged segment warmer + super.testMergedSegmentReplicationWithException(); + } + @LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG) @Override @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/pull/18255") diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index f416a022ce3b7..5898d54e90124 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -60,6 +60,7 @@ import org.opensearch.indices.replication.SegmentReplicationState; import org.opensearch.indices.replication.SegmentReplicationTarget; import org.opensearch.indices.replication.SegmentReplicationTargetService; +import org.opensearch.indices.replication.checkpoint.MergedSegmentPublisher; import org.opensearch.indices.replication.checkpoint.ReferencedSegmentsCheckpoint; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; @@ -75,6 +76,7 @@ import org.opensearch.snapshots.SnapshotInfoTests; import org.opensearch.snapshots.SnapshotShardsService; import org.opensearch.test.VersionUtils; +import org.opensearch.test.junit.annotations.TestLogging; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; import org.junit.Assert; @@ -156,9 +158,69 @@ public void testReplication() throws Exception { } @LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG) + @TestLogging(reason = "Getting trace logs from MergedSegmentWarmer", value = "org.opensearch.index.engine.MergedSegmentWarmer:TRACE") public void testMergedSegmentReplication() throws Exception { // Test that the pre-copy merged segment logic does not block the merge process of the primary shard when there are 1 replica shard. - try (ReplicationGroup shards = createGroup(1, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory());) { + final RecoverySettings recoverySettings = new RecoverySettings( + Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true).build(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + try ( + ReplicationGroup shards = createGroup( + 1, + getIndexSettings(), + indexMapping, + new NRTReplicationEngineFactory(), + recoverySettings, + MergedSegmentPublisher.EMPTY + ) + ) { + shards.startAll(); + final IndexShard primaryShard = shards.getPrimary(); + final IndexShard replicaShard = shards.getReplicas().get(0); + + // index and replicate segments to replica. + int numDocs = randomIntBetween(10, 20); + shards.indexDocs(numDocs); + primaryShard.refresh("test"); + flushShard(primaryShard); + + shards.indexDocs(numDocs); + primaryShard.refresh("test"); + flushShard(primaryShard); + replicateSegments(primaryShard, List.of(replicaShard)); + shards.assertAllEqual(2 * numDocs); + + primaryShard.forceMerge(new ForceMergeRequest("test").maxNumSegments(1)); + replicateMergedSegments(primaryShard, List.of(replicaShard)); + primaryShard.refresh("test"); + assertEquals(1, primaryShard.segments(false).size()); + // After the pre-copy merged segment is completed, the merged segment is not visible in the replica, and the number of segments + // in the replica shard is still 2. + assertEquals(2, replicaShard.segments(false).size()); + } + } + + @LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG) + public void testMergedSegmentReplicationWithException() throws Exception { + // Test that the pre-copy merged segment exception will not cause primary shard to fail + MergedSegmentPublisher mergedSegmentPublisherWithException = new MergedSegmentPublisher((indexShard, checkpoint) -> { + throw new RuntimeException("mock exception"); + }); + final RecoverySettings recoverySettings = new RecoverySettings( + Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true).build(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + try ( + ReplicationGroup shards = createGroup( + 1, + getIndexSettings(), + indexMapping, + new NRTReplicationEngineFactory(), + recoverySettings, + mergedSegmentPublisherWithException + ) + ) { shards.startAll(); final IndexShard primaryShard = shards.getPrimary(); final IndexShard replicaShard = shards.getReplicas().get(0); @@ -188,7 +250,20 @@ public void testMergedSegmentReplication() throws Exception { @LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG) public void testMergedSegmentReplicationWithZeroReplica() throws Exception { // Test that the pre-copy merged segment logic does not block the merge process of the primary shard when there are 0 replica shard. - try (ReplicationGroup shards = createGroup(0, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory());) { + final RecoverySettings recoverySettings = new RecoverySettings( + Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true).build(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + try ( + ReplicationGroup shards = createGroup( + 0, + getIndexSettings(), + indexMapping, + new NRTReplicationEngineFactory(), + recoverySettings, + MergedSegmentPublisher.EMPTY + ) + ) { shards.startAll(); final IndexShard primaryShard = shards.getPrimary(); @@ -210,7 +285,20 @@ public void testMergedSegmentReplicationWithZeroReplica() throws Exception { @LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG) public void testCleanupRedundantPendingMergeSegment() throws Exception { - try (ReplicationGroup shards = createGroup(1, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory());) { + final RecoverySettings recoverySettings = new RecoverySettings( + Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true).build(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + try ( + ReplicationGroup shards = createGroup( + 1, + getIndexSettings(), + indexMapping, + new NRTReplicationEngineFactory(), + recoverySettings, + MergedSegmentPublisher.EMPTY + ) + ) { shards.startAll(); final IndexShard primaryShard = shards.getPrimary(); final IndexShard replicaShard = shards.getReplicas().get(0); diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index bcc4f8d77e72b..333c90ac008b7 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -101,8 +101,11 @@ import org.opensearch.index.shard.PrimaryReplicaSyncer; import org.opensearch.index.shard.ShardPath; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.recovery.DefaultRecoverySettings; +import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; +import org.opensearch.indices.replication.checkpoint.MergedSegmentPublisher; import org.opensearch.tasks.TaskManager; import org.opensearch.threadpool.ThreadPool; import org.opensearch.threadpool.ThreadPool.Names; @@ -150,17 +153,48 @@ protected ReplicationGroup createGroup(int replicas, Settings settings, EngineFa protected ReplicationGroup createGroup(int replicas, Settings settings, String mappings, EngineFactory engineFactory) throws IOException { + return createGroup(replicas, settings, mappings, engineFactory, DefaultRecoverySettings.INSTANCE, MergedSegmentPublisher.EMPTY); + } + + protected ReplicationGroup createGroup( + int replicas, + Settings settings, + String mappings, + EngineFactory engineFactory, + RecoverySettings recoverySettings, + MergedSegmentPublisher mergedSegmentPublisher + ) throws IOException { Path remotePath = null; if ("true".equals(settings.get(IndexMetadata.SETTING_REMOTE_STORE_ENABLED))) { remotePath = createTempDir(); } - return createGroup(replicas, settings, mappings, engineFactory, remotePath); + return createGroup(replicas, settings, mappings, engineFactory, recoverySettings, remotePath, mergedSegmentPublisher); } protected ReplicationGroup createGroup(int replicas, Settings settings, String mappings, EngineFactory engineFactory, Path remotePath) throws IOException { + return createGroup( + replicas, + settings, + mappings, + engineFactory, + DefaultRecoverySettings.INSTANCE, + remotePath, + MergedSegmentPublisher.EMPTY + ); + } + + protected ReplicationGroup createGroup( + int replicas, + Settings settings, + String mappings, + EngineFactory engineFactory, + RecoverySettings recoverySettings, + Path remotePath, + MergedSegmentPublisher mergedSegmentPublisher + ) throws IOException { IndexMetadata metadata = buildIndexMetadata(replicas, settings, mappings); - return new ReplicationGroup(metadata, remotePath) { + return new ReplicationGroup(metadata, recoverySettings, remotePath, mergedSegmentPublisher) { @Override protected EngineFactory getEngineFactory(ShardRouting routing) { return engineFactory; @@ -253,6 +287,15 @@ protected ReplicationGroup(final IndexMetadata indexMetadata) throws IOException } protected ReplicationGroup(final IndexMetadata indexMetadata, Path remotePath) throws IOException { + this(indexMetadata, DefaultRecoverySettings.INSTANCE, remotePath, MergedSegmentPublisher.EMPTY); + } + + protected ReplicationGroup( + final IndexMetadata indexMetadata, + RecoverySettings recoverySettings, + Path remotePath, + MergedSegmentPublisher mergedSegmentPublisher + ) throws IOException { final ShardRouting primaryRouting = this.createShardRouting("s0", true); primary = newShard( primaryRouting, @@ -261,7 +304,9 @@ protected ReplicationGroup(final IndexMetadata indexMetadata, Path remotePath) t getEngineFactory(primaryRouting), () -> {}, retentionLeaseSyncer, - remotePath + recoverySettings, + remotePath, + mergedSegmentPublisher ); replicas = new CopyOnWriteArrayList<>(); this.indexMetadata = indexMetadata; diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index a300e2c9cc717..6d2bdbc8510f6 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -485,6 +485,44 @@ protected IndexShard newShard( RetentionLeaseSyncer retentionLeaseSyncer, Path path, IndexingOperationListener... listeners + ) throws IOException { + return newShard( + routing, + indexMetadata, + indexReaderWrapper, + engineFactory, + globalCheckpointSyncer, + retentionLeaseSyncer, + DefaultRecoverySettings.INSTANCE, + path, + MergedSegmentPublisher.EMPTY, + listeners + ); + } + + /** + * creates a new initializing shard. The shard will be put in its proper path under the + * current node id the shard is assigned to. + * @param routing shard routing to use + * @param indexMetadata indexMetadata for the shard, including any mapping + * @param indexReaderWrapper an optional wrapper to be used during search + * @param globalCheckpointSyncer callback for syncing global checkpoints + * @param recoverySettings recovery settings + * @param path remote path + * @param mergedSegmentPublisher merged segment publisher + * @param listeners an optional set of listeners to add to the shard + */ + protected IndexShard newShard( + ShardRouting routing, + IndexMetadata indexMetadata, + @Nullable CheckedFunction indexReaderWrapper, + @Nullable EngineFactory engineFactory, + Runnable globalCheckpointSyncer, + RetentionLeaseSyncer retentionLeaseSyncer, + RecoverySettings recoverySettings, + Path path, + MergedSegmentPublisher mergedSegmentPublisher, + IndexingOperationListener... listeners ) throws IOException { // add node id as name to settings for proper logging final ShardId shardId = routing.shardId(); @@ -501,7 +539,10 @@ protected IndexShard newShard( globalCheckpointSyncer, retentionLeaseSyncer, EMPTY_EVENT_LISTENER, + SegmentReplicationCheckpointPublisher.EMPTY, + recoverySettings, path, + mergedSegmentPublisher, listeners ); } @@ -544,7 +585,9 @@ protected IndexShard newShard( retentionLeaseSyncer, indexEventListener, SegmentReplicationCheckpointPublisher.EMPTY, + DefaultRecoverySettings.INSTANCE, remotePath, + MergedSegmentPublisher.EMPTY, listeners ); } @@ -597,7 +640,9 @@ protected IndexShard newShard(boolean primary, SegmentReplicationCheckpointPubli RetentionLeaseSyncer.EMPTY, EMPTY_EVENT_LISTENER, checkpointPublisher, - null + DefaultRecoverySettings.INSTANCE, + null, + MergedSegmentPublisher.EMPTY ); } @@ -611,6 +656,9 @@ protected IndexShard newShard(boolean primary, SegmentReplicationCheckpointPubli * @param globalCheckpointSyncer callback for syncing global checkpoints * @param indexEventListener index event listener * @param checkpointPublisher segment Replication Checkpoint Publisher to publish checkpoint + * @param recoverySettings recovery settings + * @param remotePath remote path + * @param mergedSegmentPublisher merged segment publisher * @param listeners an optional set of listeners to add to the shard */ protected IndexShard newShard( @@ -625,7 +673,9 @@ protected IndexShard newShard( RetentionLeaseSyncer retentionLeaseSyncer, IndexEventListener indexEventListener, SegmentReplicationCheckpointPublisher checkpointPublisher, + RecoverySettings recoverySettings, @Nullable Path remotePath, + MergedSegmentPublisher mergedSegmentPublisher, IndexingOperationListener... listeners ) throws IOException { Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build(); @@ -726,7 +776,7 @@ protected IndexShard newShard( remoteStore, remoteStoreStatsTrackerFactory, "dummy-node", - DefaultRecoverySettings.INSTANCE, + recoverySettings, DefaultRemoteStoreSettings.INSTANCE, false, discoveryNodes, @@ -737,7 +787,7 @@ protected IndexShard newShard( indexSettings::getRefreshInterval, new Object(), clusterService.getClusterApplierService(), - MergedSegmentPublisher.EMPTY, + mergedSegmentPublisher, ReferencedSegmentsPublisher.EMPTY ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER);