Skip to content

Commit e3fd031

Browse files
astute-decipherskumarp7
authored andcommitted
Adding support to fetch changes from Lucene store while migrating from/to r… (opensearch-project#1369) (opensearch-project#1375)
Signed-off-by: Shubh Sahu <[email protected]>
1 parent 6f0ec08 commit e3fd031

File tree

5 files changed

+155
-89
lines changed

5 files changed

+155
-89
lines changed

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,8 @@ testClusters {
381381
testDistribution = "ARCHIVE"
382382
}
383383
int debugPort = 5005
384+
//adding it to test migration
385+
systemProperty('opensearch.experimental.feature.remote_store.migration.enabled','true')
384386

385387
if (_numNodes > 1) numberOfNodes = _numNodes
386388
//numberOfNodes = 3

src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
387387
override fun getCustomTranslogDeletionPolicyFactory(): Optional<TranslogDeletionPolicyFactory> {
388388
// We don't need a retention lease translog deletion policy for remote store enabled clusters as
389389
// we fetch the operations directly from lucene in such cases.
390-
return if (ValidationUtil.isRemoteStoreEnabledCluster(clusterService) == false) {
390+
return if (ValidationUtil.isRemoteEnabledOrMigrating(clusterService) == false) {
391391
Optional.of(TranslogDeletionPolicyFactory { indexSettings, retentionLeasesSupplier ->
392392
ReplicationTranslogDeletionPolicy(indexSettings, retentionLeasesSupplier)
393393
})

src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
7777
indexMetric.lastFetchTime.set(relativeStartNanos)
7878

7979
val indexShard = indicesService.indexServiceSafe(shardId.index).getShard(shardId.id)
80-
val isRemoteStoreEnabled = ValidationUtil.isRemoteStoreEnabledCluster(clusterService)
81-
if (lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled) < request.fromSeqNo) {
80+
val isRemoteEnabledOrMigrating = ValidationUtil.isRemoteEnabledOrMigrating(clusterService)
81+
if (lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating) < request.fromSeqNo) {
8282
// There are no new operations to sync. Do a long poll and wait for GlobalCheckpoint to advance. If
8383
// the checkpoint doesn't advance by the timeout this throws an ESTimeoutException which the caller
8484
// should catch and start a new poll.
@@ -87,18 +87,18 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
8787
// At this point indexShard.lastKnownGlobalCheckpoint has advanced but it may not yet have been synced
8888
// to the translog, which means we can't return those changes. Return to the caller to retry.
8989
// TODO: Figure out a better way to wait for the global checkpoint to be synced to the translog
90-
if (lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled) < request.fromSeqNo) {
91-
assert(gcp > lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled)) { "Checkpoint didn't advance at all $gcp ${lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled)}" }
90+
if (lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating) < request.fromSeqNo) {
91+
assert(gcp > lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating)) { "Checkpoint didn't advance at all $gcp ${lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating)}" }
9292
throw OpenSearchTimeoutException("global checkpoint not synced. Retry after a few miliseconds...")
9393
}
9494
}
9595

9696
relativeStartNanos = System.nanoTime()
9797
// At this point lastSyncedGlobalCheckpoint is at least fromSeqNo
98-
val toSeqNo = min(lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled), request.toSeqNo)
98+
val toSeqNo = min(lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating), request.toSeqNo)
9999

100100
var ops: List<Translog.Operation> = listOf()
101-
var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId) && isRemoteStoreEnabled == false
101+
var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId) && isRemoteEnabledOrMigrating == false
102102
if(fetchFromTranslog) {
103103
try {
104104
ops = translogService.getHistoryOfOperations(indexShard, request.fromSeqNo, toSeqNo)
@@ -136,16 +136,16 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
136136
indexMetric.ops.addAndGet(ops.size.toLong())
137137

138138
ops.stream().forEach{op -> indexMetric.bytesRead.addAndGet(op.estimateSize()) }
139-
GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled))
139+
GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating))
140140
}
141141
}
142142
}
143143

144-
private fun lastGlobalCheckpoint(indexShard: IndexShard, isRemoteStoreEnabled: Boolean): Long {
144+
private fun lastGlobalCheckpoint(indexShard: IndexShard, isRemoteEnabledOrMigrating: Boolean): Long {
145145
// We rely on lastSyncedGlobalCheckpoint as it has been durably written to disk. In case of remote store
146146
// enabled clusters, the semantics are slightly different, and we can't use lastSyncedGlobalCheckpoint. Falling back to
147147
// lastKnownGlobalCheckpoint in such cases.
148-
return if (isRemoteStoreEnabled) {
148+
return if (isRemoteEnabledOrMigrating) {
149149
indexShard.lastKnownGlobalCheckpoint
150150
} else {
151151
indexShard.lastSyncedGlobalCheckpoint
@@ -173,7 +173,7 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
173173
override fun shards(state: ClusterState, request: InternalRequest): ShardsIterator {
174174
val shardIt = state.routingTable().shardRoutingTable(request.request().shardId)
175175
// Random active shards
176-
return if (ValidationUtil.isRemoteStoreEnabledCluster(clusterService)) shardIt.primaryShardIt()
176+
return if (ValidationUtil.isRemoteEnabledOrMigrating(clusterService)) shardIt.primaryShardIt()
177177
else shardIt.activeInitializingShardsRandomIt()
178178
}
179179
}

src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,18 @@ import org.opensearch.Version
1717
import org.opensearch.cluster.ClusterState
1818
import org.opensearch.cluster.metadata.IndexMetadata
1919
import org.opensearch.cluster.metadata.MetadataCreateIndexService
20-
import org.opensearch.core.common.Strings
20+
import org.opensearch.cluster.service.ClusterService
2121
import org.opensearch.common.ValidationException
2222
import org.opensearch.common.settings.Settings
23+
import org.opensearch.core.common.Strings
2324
import org.opensearch.env.Environment
2425
import org.opensearch.index.IndexNotFoundException
25-
import java.io.UnsupportedEncodingException
26-
import org.opensearch.cluster.service.ClusterService
2726
import org.opensearch.node.Node
2827
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute
28+
import org.opensearch.node.remotestore.RemoteStoreNodeService
2929
import org.opensearch.replication.ReplicationPlugin.Companion.KNN_INDEX_SETTING
3030
import org.opensearch.replication.ReplicationPlugin.Companion.KNN_PLUGIN_PRESENT_SETTING
31-
import org.opensearch.replication.action.changes.TransportGetChangesAction
31+
import java.io.UnsupportedEncodingException
3232
import java.nio.file.Files
3333
import java.nio.file.Path
3434
import java.util.Locale
@@ -161,4 +161,8 @@ object ValidationUtil {
161161
return clusterService.settings.getByPrefix(Node.NODE_ATTRIBUTES.key + RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX).isEmpty == false
162162
}
163163

164+
fun isRemoteEnabledOrMigrating(clusterService: ClusterService): Boolean {
165+
return isRemoteStoreEnabledCluster(clusterService) ||
166+
clusterService.clusterSettings.get(RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING).equals(RemoteStoreNodeService.CompatibilityMode.MIXED)
167+
}
164168
}

0 commit comments

Comments
 (0)