Skip to content

Commit 3ab843d

Browse files
author
Harsh Garg
committed
Adding version checks to remote entities using bytestream ser/de
Signed-off-by: Harsh Garg <[email protected]>
1 parent f0a1dba commit 3ab843d

20 files changed

+214
-97
lines changed

server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.apache.logging.log4j.message.ParameterizedMessage;
14+
import org.opensearch.Version;
1415
import org.opensearch.action.LatchedActionListener;
1516
import org.opensearch.cluster.Diff;
1617
import org.opensearch.cluster.routing.IndexRoutingTable;
@@ -182,15 +183,16 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
182183
public void getAsyncIndexRoutingReadAction(
183184
String clusterUUID,
184185
String uploadedFilename,
185-
LatchedActionListener<IndexRoutingTable> latchedActionListener
186+
LatchedActionListener<IndexRoutingTable> latchedActionListener,
187+
Version version
186188
) {
187189

188190
ActionListener<IndexRoutingTable> actionListener = ActionListener.wrap(
189191
latchedActionListener::onResponse,
190192
latchedActionListener::onFailure
191193
);
192194

193-
RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor);
195+
RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor, version);
194196

195197
remoteIndexRoutingTableStore.readAsync(remoteIndexRoutingTable, actionListener);
196198
}
@@ -199,14 +201,15 @@ public void getAsyncIndexRoutingReadAction(
199201
public void getAsyncIndexRoutingTableDiffReadAction(
200202
String clusterUUID,
201203
String uploadedFilename,
202-
LatchedActionListener<Diff<RoutingTable>> latchedActionListener
204+
LatchedActionListener<Diff<RoutingTable>> latchedActionListener,
205+
Version version
203206
) {
204207
ActionListener<Diff<RoutingTable>> actionListener = ActionListener.wrap(
205208
latchedActionListener::onResponse,
206209
latchedActionListener::onFailure
207210
);
208211

209-
RemoteRoutingTableDiff remoteRoutingTableDiff = new RemoteRoutingTableDiff(uploadedFilename, clusterUUID, compressor);
212+
RemoteRoutingTableDiff remoteRoutingTableDiff = new RemoteRoutingTableDiff(uploadedFilename, clusterUUID, compressor, version);
210213
remoteRoutingTableDiffStore.readAsync(remoteRoutingTableDiff, actionListener);
211214
}
212215

server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.cluster.routing.remote;
1010

11+
import org.opensearch.Version;
1112
import org.opensearch.action.LatchedActionListener;
1213
import org.opensearch.cluster.Diff;
1314
import org.opensearch.cluster.routing.IndexRoutingTable;
@@ -71,7 +72,8 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
7172
public void getAsyncIndexRoutingReadAction(
7273
String clusterUUID,
7374
String uploadedFilename,
74-
LatchedActionListener<IndexRoutingTable> latchedActionListener
75+
LatchedActionListener<IndexRoutingTable> latchedActionListener,
76+
Version version
7577
) {
7678
// noop
7779
}
@@ -80,7 +82,8 @@ public void getAsyncIndexRoutingReadAction(
8082
public void getAsyncIndexRoutingTableDiffReadAction(
8183
String clusterUUID,
8284
String uploadedFilename,
83-
LatchedActionListener<Diff<RoutingTable>> latchedActionListener
85+
LatchedActionListener<Diff<RoutingTable>> latchedActionListener,
86+
Version version
8487
) {
8588
// noop
8689
}

server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.cluster.routing.remote;
1010

11+
import org.opensearch.Version;
1112
import org.opensearch.action.LatchedActionListener;
1213
import org.opensearch.cluster.Diff;
1314
import org.opensearch.cluster.routing.IndexRoutingTable;
@@ -31,13 +32,15 @@ public interface RemoteRoutingTableService extends LifecycleComponent {
3132
void getAsyncIndexRoutingReadAction(
3233
String clusterUUID,
3334
String uploadedFilename,
34-
LatchedActionListener<IndexRoutingTable> latchedActionListener
35+
LatchedActionListener<IndexRoutingTable> latchedActionListener,
36+
Version version
3537
);
3638

3739
void getAsyncIndexRoutingTableDiffReadAction(
3840
String clusterUUID,
3941
String uploadedFilename,
40-
LatchedActionListener<Diff<RoutingTable>> latchedActionListener
42+
LatchedActionListener<Diff<RoutingTable>> latchedActionListener,
43+
Version version
4144
);
4245

4346
List<ClusterMetadataManifest.UploadedIndexMetadata> getUpdatedIndexRoutingTableMetadata(

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1296,7 +1296,8 @@ ClusterState readClusterStateInParallel(
12961296
remoteRoutingTableService.getAsyncIndexRoutingReadAction(
12971297
clusterUUID,
12981298
indexRouting.getUploadedFilename(),
1299-
routingTableLatchedActionListener
1299+
routingTableLatchedActionListener,
1300+
manifest.getOpensearchVersion()
13001301
);
13011302
}
13021303

@@ -1315,7 +1316,8 @@ ClusterState readClusterStateInParallel(
13151316
remoteRoutingTableService.getAsyncIndexRoutingTableDiffReadAction(
13161317
clusterUUID,
13171318
manifest.getDiffManifest().getIndicesRoutingDiffPath(),
1318-
routingTableDiffLatchedActionListener
1319+
routingTableDiffLatchedActionListener,
1320+
manifest.getOpensearchVersion()
13191321
);
13201322
}
13211323

@@ -1392,7 +1394,8 @@ ClusterState readClusterStateInParallel(
13921394
new RemoteDiscoveryNodes(
13931395
manifest.getDiscoveryNodesMetadata().getUploadedFilename(),
13941396
clusterUUID,
1395-
blobStoreRepository.getCompressor()
1397+
blobStoreRepository.getCompressor(),
1398+
manifest.getOpensearchVersion()
13961399
),
13971400
listener
13981401
);
@@ -1404,7 +1407,8 @@ ClusterState readClusterStateInParallel(
14041407
new RemoteClusterBlocks(
14051408
manifest.getClusterBlocksMetadata().getUploadedFilename(),
14061409
clusterUUID,
1407-
blobStoreRepository.getCompressor()
1410+
blobStoreRepository.getCompressor(),
1411+
manifest.getOpensearchVersion()
14081412
),
14091413
listener
14101414
);
@@ -1416,7 +1420,8 @@ ClusterState readClusterStateInParallel(
14161420
new RemoteHashesOfConsistentSettings(
14171421
manifest.getHashesOfConsistentSettings().getUploadedFilename(),
14181422
clusterUUID,
1419-
blobStoreRepository.getCompressor()
1423+
blobStoreRepository.getCompressor(),
1424+
manifest.getOpensearchVersion()
14201425
),
14211426
listener
14221427
);
@@ -1431,7 +1436,8 @@ ClusterState readClusterStateInParallel(
14311436
entry.getValue().getAttributeName(),
14321437
clusterUUID,
14331438
blobStoreRepository.getCompressor(),
1434-
namedWriteableRegistry
1439+
namedWriteableRegistry,
1440+
manifest.getOpensearchVersion()
14351441
),
14361442
listener
14371443
);

server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.gateway.remote.model;
1010

11+
import org.opensearch.Version;
1112
import org.opensearch.cluster.block.ClusterBlocks;
1213
import org.opensearch.common.io.Streams;
1314
import org.opensearch.common.remote.AbstractClusterMetadataWriteableBlobEntity;
@@ -32,10 +33,7 @@
3233
public class RemoteClusterBlocks extends AbstractClusterMetadataWriteableBlobEntity<ClusterBlocks> {
3334

3435
public static final String CLUSTER_BLOCKS = "blocks";
35-
public static final ChecksumWritableBlobStoreFormat<ClusterBlocks> CLUSTER_BLOCKS_FORMAT = new ChecksumWritableBlobStoreFormat<>(
36-
"blocks",
37-
ClusterBlocks::readFrom
38-
);
36+
public final ChecksumWritableBlobStoreFormat<ClusterBlocks> clusterBlocksFormat;
3937

4038
private ClusterBlocks clusterBlocks;
4139
private long stateVersion;
@@ -44,11 +42,16 @@ public RemoteClusterBlocks(final ClusterBlocks clusterBlocks, long stateVersion,
4442
super(clusterUUID, compressor, null);
4543
this.clusterBlocks = clusterBlocks;
4644
this.stateVersion = stateVersion;
45+
this.clusterBlocksFormat = new ChecksumWritableBlobStoreFormat<>("blocks", ClusterBlocks::readFrom);
4746
}
4847

49-
public RemoteClusterBlocks(final String blobName, final String clusterUUID, final Compressor compressor) {
48+
public RemoteClusterBlocks(final String blobName, final String clusterUUID, final Compressor compressor, final Version version) {
5049
super(clusterUUID, compressor, null);
5150
this.blobName = blobName;
51+
this.clusterBlocksFormat = new ChecksumWritableBlobStoreFormat<>("blocks", is -> {
52+
is.setVersion(version);
53+
return ClusterBlocks.readFrom(is);
54+
});
5255
}
5356

5457
@Override
@@ -83,11 +86,11 @@ public UploadedMetadata getUploadedMetadata() {
8386

8487
@Override
8588
public InputStream serialize() throws IOException {
86-
return CLUSTER_BLOCKS_FORMAT.serialize(clusterBlocks, generateBlobFileName(), getCompressor()).streamInput();
89+
return this.clusterBlocksFormat.serialize(clusterBlocks, generateBlobFileName(), getCompressor()).streamInput();
8790
}
8891

8992
@Override
9093
public ClusterBlocks deserialize(final InputStream inputStream) throws IOException {
91-
return CLUSTER_BLOCKS_FORMAT.deserialize(blobName, Streams.readFully(inputStream));
94+
return this.clusterBlocksFormat.deserialize(blobName, Streams.readFully(inputStream));
9295
}
9396
}

server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.gateway.remote.model;
1010

11+
import org.opensearch.Version;
1112
import org.opensearch.cluster.ClusterState;
1213
import org.opensearch.cluster.ClusterState.Custom;
1314
import org.opensearch.common.io.Streams;
@@ -65,16 +66,17 @@ public RemoteClusterStateCustoms(
6566
final String customType,
6667
final String clusterUUID,
6768
final Compressor compressor,
68-
final NamedWriteableRegistry namedWriteableRegistry
69+
final NamedWriteableRegistry namedWriteableRegistry,
70+
final Version version
6971
) {
7072
super(clusterUUID, compressor, null);
7173
this.blobName = blobName;
7274
this.customType = customType;
7375
this.namedWriteableRegistry = namedWriteableRegistry;
74-
this.clusterStateCustomsFormat = new ChecksumWritableBlobStoreFormat<>(
75-
"cluster-state-custom",
76-
is -> readFrom(is, namedWriteableRegistry, customType)
77-
);
76+
this.clusterStateCustomsFormat = new ChecksumWritableBlobStoreFormat<>("cluster-state-custom", is -> {
77+
is.setVersion(version);
78+
return readFrom(is, namedWriteableRegistry, customType);
79+
});
7880
}
7981

8082
@Override

server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.gateway.remote.model;
1010

11+
import org.opensearch.Version;
1112
import org.opensearch.cluster.node.DiscoveryNodes;
1213
import org.opensearch.common.io.Streams;
1314
import org.opensearch.common.remote.AbstractClusterMetadataWriteableBlobEntity;
@@ -32,10 +33,7 @@
3233
public class RemoteDiscoveryNodes extends AbstractClusterMetadataWriteableBlobEntity<DiscoveryNodes> {
3334

3435
public static final String DISCOVERY_NODES = "nodes";
35-
public static final ChecksumWritableBlobStoreFormat<DiscoveryNodes> DISCOVERY_NODES_FORMAT = new ChecksumWritableBlobStoreFormat<>(
36-
"nodes",
37-
is -> DiscoveryNodes.readFrom(is, null)
38-
);
36+
public final ChecksumWritableBlobStoreFormat<DiscoveryNodes> discoveryNodesFormat;
3937

4038
private DiscoveryNodes discoveryNodes;
4139
private long stateVersion;
@@ -49,11 +47,16 @@ public RemoteDiscoveryNodes(
4947
super(clusterUUID, compressor, null);
5048
this.discoveryNodes = discoveryNodes;
5149
this.stateVersion = stateVersion;
50+
this.discoveryNodesFormat = new ChecksumWritableBlobStoreFormat<>("nodes", is -> DiscoveryNodes.readFrom(is, null));
5251
}
5352

54-
public RemoteDiscoveryNodes(final String blobName, final String clusterUUID, final Compressor compressor) {
53+
public RemoteDiscoveryNodes(final String blobName, final String clusterUUID, final Compressor compressor, final Version version) {
5554
super(clusterUUID, compressor, null);
5655
this.blobName = blobName;
56+
this.discoveryNodesFormat = new ChecksumWritableBlobStoreFormat<>("nodes", is -> {
57+
is.setVersion(version);
58+
return DiscoveryNodes.readFrom(is, null);
59+
});
5760
}
5861

5962
@Override
@@ -88,7 +91,7 @@ public UploadedMetadata getUploadedMetadata() {
8891

8992
@Override
9093
public InputStream serialize() throws IOException {
91-
return DISCOVERY_NODES_FORMAT.serialize(
94+
return discoveryNodesFormat.serialize(
9295
(out, discoveryNode) -> discoveryNode.writeToWithAttribute(out),
9396
discoveryNodes,
9497
generateBlobFileName(),
@@ -98,6 +101,6 @@ public InputStream serialize() throws IOException {
98101

99102
@Override
100103
public DiscoveryNodes deserialize(final InputStream inputStream) throws IOException {
101-
return DISCOVERY_NODES_FORMAT.deserialize(blobName, Streams.readFully(inputStream));
104+
return discoveryNodesFormat.deserialize(blobName, Streams.readFully(inputStream));
102105
}
103106
}

server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.gateway.remote.model;
1010

11+
import org.opensearch.Version;
1112
import org.opensearch.cluster.metadata.DiffableStringMap;
1213
import org.opensearch.common.io.Streams;
1314
import org.opensearch.common.remote.AbstractClusterMetadataWriteableBlobEntity;
@@ -30,8 +31,7 @@
3031
*/
3132
public class RemoteHashesOfConsistentSettings extends AbstractClusterMetadataWriteableBlobEntity<DiffableStringMap> {
3233
public static final String HASHES_OF_CONSISTENT_SETTINGS = "hashes-of-consistent-settings";
33-
public static final ChecksumWritableBlobStoreFormat<DiffableStringMap> HASHES_OF_CONSISTENT_SETTINGS_FORMAT =
34-
new ChecksumWritableBlobStoreFormat<>("hashes-of-consistent-settings", DiffableStringMap::readFrom);
34+
public final ChecksumWritableBlobStoreFormat<DiffableStringMap> hashesOfConsistentSettingsFormat;
3535

3636
private DiffableStringMap hashesOfConsistentSettings;
3737
private long metadataVersion;
@@ -45,11 +45,24 @@ public RemoteHashesOfConsistentSettings(
4545
super(clusterUUID, compressor, null);
4646
this.metadataVersion = metadataVersion;
4747
this.hashesOfConsistentSettings = hashesOfConsistentSettings;
48+
this.hashesOfConsistentSettingsFormat = new ChecksumWritableBlobStoreFormat<>(
49+
"hashes-of-consistent-settings",
50+
DiffableStringMap::readFrom
51+
);
4852
}
4953

50-
public RemoteHashesOfConsistentSettings(final String blobName, final String clusterUUID, final Compressor compressor) {
54+
public RemoteHashesOfConsistentSettings(
55+
final String blobName,
56+
final String clusterUUID,
57+
final Compressor compressor,
58+
final Version version
59+
) {
5160
super(clusterUUID, compressor, null);
5261
this.blobName = blobName;
62+
this.hashesOfConsistentSettingsFormat = new ChecksumWritableBlobStoreFormat<>("hashes-of-consistent-settings", is -> {
63+
is.setVersion(version);
64+
return DiffableStringMap.readFrom(is);
65+
});
5366
}
5467

5568
@Override
@@ -83,12 +96,12 @@ public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() {
8396

8497
@Override
8598
public InputStream serialize() throws IOException {
86-
return HASHES_OF_CONSISTENT_SETTINGS_FORMAT.serialize(hashesOfConsistentSettings, generateBlobFileName(), getCompressor())
99+
return hashesOfConsistentSettingsFormat.serialize(hashesOfConsistentSettings, generateBlobFileName(), getCompressor())
87100
.streamInput();
88101
}
89102

90103
@Override
91104
public DiffableStringMap deserialize(final InputStream inputStream) throws IOException {
92-
return HASHES_OF_CONSISTENT_SETTINGS_FORMAT.deserialize(blobName, Streams.readFully(inputStream));
105+
return hashesOfConsistentSettingsFormat.deserialize(blobName, Streams.readFully(inputStream));
93106
}
94107
}

0 commit comments

Comments
 (0)