Skip to content
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Fixed
- Fix flaky test FieldDataLoadingIT.testIndicesFieldDataCacheSizeSetting ([#19571](https://github.com/opensearch-project/OpenSearch/pull/19571))
- Avoid primary shard failure caused by merged segment warmer exceptions ([#19436](https://github.com/opensearch-project/OpenSearch/pull/19436))

### Dependencies
- Bump `com.azure:azure-core-http-netty` from 1.15.12 to 1.16.1 ([#19533](https://github.com/opensearch-project/OpenSearch/pull/19533))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.opensearch.action.admin.indices.segments.ShardSegments;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand All @@ -38,6 +39,7 @@
import java.util.stream.Collectors;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

/**
* This class runs Segment Replication Integ test suite with merged segment warmer enabled.
Expand All @@ -59,6 +61,33 @@ protected Settings featureFlagSettings() {
return featureSettings.build();
}

public void testPrimaryNodeRestart() throws Exception {
logger.info("--> start nodes");
internalCluster().startNode();

logger.info("--> creating test index: {}", INDEX_NAME);
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put("number_of_shards", 1).put("number_of_replicas", 0).build());

ensureGreen();

logger.info("--> indexing sample data");
final int numDocs = 100;
final IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs];

for (int i = 0; i < numDocs; i++) {
docs[i] = client().prepareIndex(INDEX_NAME)
.setSource("foo-int", randomInt(), "foo-string", randomAlphaOfLength(32), "foo-float", randomFloat());
}

indexRandom(true, docs);
flush();
assertThat(client().prepareSearch(INDEX_NAME).setSize(0).get().getHits().getTotalHits().value(), equalTo((long) numDocs));

logger.info("--> restarting cluster");
internalCluster().fullRestart();
ensureGreen();
}

public void testMergeSegmentWarmer() throws Exception {
final String primaryNode = internalCluster().startDataOnlyNode();
final String replicaNode = internalCluster().startDataOnlyNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,29 +51,39 @@ public MergedSegmentWarmer(

@Override
public void warm(LeafReader leafReader) throws IOException {
if (shouldWarm() == false) {
return;
}
// IndexWriter.IndexReaderWarmer#warm is called by IndexWriter#mergeMiddle. The type of leafReader should be SegmentReader.
assert leafReader instanceof SegmentReader;
assert indexShard.indexSettings().isSegRepLocalEnabled() || indexShard.indexSettings().isRemoteStoreEnabled();
try {
if (shouldWarm() == false) {
return;
}
// IndexWriter.IndexReaderWarmer#warm is called by IndexWriter#mergeMiddle. The type of leafReader should be SegmentReader.
assert leafReader instanceof SegmentReader;
assert indexShard.indexSettings().isSegRepLocalEnabled() || indexShard.indexSettings().isRemoteStoreEnabled();

long startTime = System.currentTimeMillis();
SegmentCommitInfo segmentCommitInfo = ((SegmentReader) leafReader).getSegmentInfo();
logger.trace(() -> new ParameterizedMessage("Warming segment: {}", segmentCommitInfo));
indexShard.publishMergedSegment(segmentCommitInfo);
logger.trace(() -> {
long segmentSize = -1;
try {
segmentSize = segmentCommitInfo.sizeInBytes();
} catch (IOException ignored) {}
return new ParameterizedMessage(
"Completed segment warming for {}. Size: {}B, Timing: {}ms",
segmentCommitInfo.info.name,
segmentSize,
(System.currentTimeMillis() - startTime)
long startTime = System.currentTimeMillis();
SegmentCommitInfo segmentCommitInfo = ((SegmentReader) leafReader).getSegmentInfo();
logger.info(() -> new ParameterizedMessage("Warming segment: {}", segmentCommitInfo));
indexShard.publishMergedSegment(segmentCommitInfo);
logger.trace(() -> {
long segmentSize = -1;
try {
segmentSize = segmentCommitInfo.sizeInBytes();
} catch (IOException ignored) {}
return new ParameterizedMessage(
"Completed segment warming for {}. Size: {}B, Timing: {}ms",
segmentCommitInfo.info.name,
segmentSize,
(System.currentTimeMillis() - startTime)
);
});
} catch (Exception e) {
logger.warn(
() -> new ParameterizedMessage(
"Throw exception during merged segment warmer, skip merged segment {} warmer",
((SegmentReader) leafReader).getSegmentName()
),
e
);
});
}
}

// package-private for tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,14 @@ public void testMergedSegmentReplication() throws Exception {
super.testMergedSegmentReplication();
}

@LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG)
@Override
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/pull/19436")
public void testMergedSegmentReplicationWithException() throws Exception {
// TODO: wait for remote store to support merged segment warmer
super.testMergedSegmentReplicationWithException();
}

@LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG)
@Override
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/pull/18255")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.indices.replication.SegmentReplicationTarget;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.MergedSegmentPublisher;
import org.opensearch.indices.replication.checkpoint.ReferencedSegmentsCheckpoint;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
Expand All @@ -75,6 +76,7 @@
import org.opensearch.snapshots.SnapshotInfoTests;
import org.opensearch.snapshots.SnapshotShardsService;
import org.opensearch.test.VersionUtils;
import org.opensearch.test.junit.annotations.TestLogging;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
import org.junit.Assert;
Expand Down Expand Up @@ -156,9 +158,69 @@ public void testReplication() throws Exception {
}

@LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG)
@TestLogging(reason = "Getting trace logs from MergedSegmentWarmer", value = "org.opensearch.index.engine.MergedSegmentWarmer:TRACE")
public void testMergedSegmentReplication() throws Exception {
// Test that the pre-copy merged segment logic does not block the merge process of the primary shard when there are 1 replica shard.
try (ReplicationGroup shards = createGroup(1, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory());) {
final RecoverySettings recoverySettings = new RecoverySettings(
Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true).build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);
try (
ReplicationGroup shards = createGroup(
1,
getIndexSettings(),
indexMapping,
new NRTReplicationEngineFactory(),
recoverySettings,
MergedSegmentPublisher.EMPTY
)
) {
shards.startAll();
final IndexShard primaryShard = shards.getPrimary();
final IndexShard replicaShard = shards.getReplicas().get(0);

// index and replicate segments to replica.
int numDocs = randomIntBetween(10, 20);
shards.indexDocs(numDocs);
primaryShard.refresh("test");
flushShard(primaryShard);

shards.indexDocs(numDocs);
primaryShard.refresh("test");
flushShard(primaryShard);
replicateSegments(primaryShard, List.of(replicaShard));
shards.assertAllEqual(2 * numDocs);

primaryShard.forceMerge(new ForceMergeRequest("test").maxNumSegments(1));
replicateMergedSegments(primaryShard, List.of(replicaShard));
primaryShard.refresh("test");
assertEquals(1, primaryShard.segments(false).size());
// After the pre-copy merged segment is completed, the merged segment is not visible in the replica, and the number of segments
// in the replica shard is still 2.
assertEquals(2, replicaShard.segments(false).size());
}
}

@LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG)
public void testMergedSegmentReplicationWithException() throws Exception {
// Test that the pre-copy merged segment exception will not cause primary shard to fail
MergedSegmentPublisher mergedSegmentPublisherWithException = new MergedSegmentPublisher((indexShard, checkpoint) -> {
throw new RuntimeException("mock exception");
});
final RecoverySettings recoverySettings = new RecoverySettings(
Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true).build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);
try (
ReplicationGroup shards = createGroup(
1,
getIndexSettings(),
indexMapping,
new NRTReplicationEngineFactory(),
recoverySettings,
mergedSegmentPublisherWithException
)
) {
shards.startAll();
final IndexShard primaryShard = shards.getPrimary();
final IndexShard replicaShard = shards.getReplicas().get(0);
Expand Down Expand Up @@ -188,7 +250,20 @@ public void testMergedSegmentReplication() throws Exception {
@LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG)
public void testMergedSegmentReplicationWithZeroReplica() throws Exception {
// Test that the pre-copy merged segment logic does not block the merge process of the primary shard when there are 0 replica shard.
try (ReplicationGroup shards = createGroup(0, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory());) {
final RecoverySettings recoverySettings = new RecoverySettings(
Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true).build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);
try (
ReplicationGroup shards = createGroup(
0,
getIndexSettings(),
indexMapping,
new NRTReplicationEngineFactory(),
recoverySettings,
MergedSegmentPublisher.EMPTY
)
) {
shards.startAll();
final IndexShard primaryShard = shards.getPrimary();

Expand All @@ -210,7 +285,20 @@ public void testMergedSegmentReplicationWithZeroReplica() throws Exception {

@LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG)
public void testCleanupRedundantPendingMergeSegment() throws Exception {
try (ReplicationGroup shards = createGroup(1, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory());) {
final RecoverySettings recoverySettings = new RecoverySettings(
Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true).build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);
try (
ReplicationGroup shards = createGroup(
1,
getIndexSettings(),
indexMapping,
new NRTReplicationEngineFactory(),
recoverySettings,
MergedSegmentPublisher.EMPTY
)
) {
shards.startAll();
final IndexShard primaryShard = shards.getPrimary();
final IndexShard replicaShard = shards.getReplicas().get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,11 @@
import org.opensearch.index.shard.PrimaryReplicaSyncer;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.recovery.DefaultRecoverySettings;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.checkpoint.MergedSegmentPublisher;
import org.opensearch.tasks.TaskManager;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPool.Names;
Expand Down Expand Up @@ -150,17 +153,48 @@ protected ReplicationGroup createGroup(int replicas, Settings settings, EngineFa

protected ReplicationGroup createGroup(int replicas, Settings settings, String mappings, EngineFactory engineFactory)
throws IOException {
return createGroup(replicas, settings, mappings, engineFactory, DefaultRecoverySettings.INSTANCE, MergedSegmentPublisher.EMPTY);
}

protected ReplicationGroup createGroup(
int replicas,
Settings settings,
String mappings,
EngineFactory engineFactory,
RecoverySettings recoverySettings,
MergedSegmentPublisher mergedSegmentPublisher
) throws IOException {
Path remotePath = null;
if ("true".equals(settings.get(IndexMetadata.SETTING_REMOTE_STORE_ENABLED))) {
remotePath = createTempDir();
}
return createGroup(replicas, settings, mappings, engineFactory, remotePath);
return createGroup(replicas, settings, mappings, engineFactory, recoverySettings, remotePath, mergedSegmentPublisher);
}

protected ReplicationGroup createGroup(int replicas, Settings settings, String mappings, EngineFactory engineFactory, Path remotePath)
throws IOException {
return createGroup(
replicas,
settings,
mappings,
engineFactory,
DefaultRecoverySettings.INSTANCE,
remotePath,
MergedSegmentPublisher.EMPTY
);
}

protected ReplicationGroup createGroup(
int replicas,
Settings settings,
String mappings,
EngineFactory engineFactory,
RecoverySettings recoverySettings,
Path remotePath,
MergedSegmentPublisher mergedSegmentPublisher
) throws IOException {
IndexMetadata metadata = buildIndexMetadata(replicas, settings, mappings);
return new ReplicationGroup(metadata, remotePath) {
return new ReplicationGroup(metadata, recoverySettings, remotePath, mergedSegmentPublisher) {
@Override
protected EngineFactory getEngineFactory(ShardRouting routing) {
return engineFactory;
Expand Down Expand Up @@ -253,6 +287,15 @@ protected ReplicationGroup(final IndexMetadata indexMetadata) throws IOException
}

protected ReplicationGroup(final IndexMetadata indexMetadata, Path remotePath) throws IOException {
this(indexMetadata, DefaultRecoverySettings.INSTANCE, remotePath, MergedSegmentPublisher.EMPTY);
}

protected ReplicationGroup(
final IndexMetadata indexMetadata,
RecoverySettings recoverySettings,
Path remotePath,
MergedSegmentPublisher mergedSegmentPublisher
) throws IOException {
final ShardRouting primaryRouting = this.createShardRouting("s0", true);
primary = newShard(
primaryRouting,
Expand All @@ -261,7 +304,9 @@ protected ReplicationGroup(final IndexMetadata indexMetadata, Path remotePath) t
getEngineFactory(primaryRouting),
() -> {},
retentionLeaseSyncer,
remotePath
recoverySettings,
remotePath,
mergedSegmentPublisher
);
replicas = new CopyOnWriteArrayList<>();
this.indexMetadata = indexMetadata;
Expand Down
Loading
Loading