Skip to content

Commit 41de135

Browse files
author
Uday Bhaskar
committed
add index-level-encryption support for snapshots and remote-store
Signed-off-by: Uday Bhaskar <[email protected]>
1 parent bd24685 commit 41de135

File tree

43 files changed

+1614
-134
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1614
-134
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3333
- Allow the truncate filter in normalizers ([#19778](https://github.com/opensearch-project/OpenSearch/issues/19778))
3434
- Support pull-based ingestion message mappers and raw payload support ([#19765](https://github.com/opensearch-project/OpenSearch/pull/19765))
3535
- Support dynamic consumer configuration update in pull-based ingestion ([#19963](https://github.com/opensearch-project/OpenSearch/pull/19963))
36+
- Add index-level-encryption support for snapshots and remote-store ([#20095](https://github.com/opensearch-project/OpenSearch/pull/20095))
3637

3738
### Changed
3839
- Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350))

plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureBlobContainer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.opensearch.action.ActionRunnable;
4141
import org.opensearch.action.support.GroupedActionListener;
4242
import org.opensearch.action.support.PlainActionFuture;
43+
import org.opensearch.cluster.metadata.CryptoMetadata;
4344
import org.opensearch.common.Nullable;
4445
import org.opensearch.common.blobstore.BlobContainer;
4546
import org.opensearch.common.blobstore.BlobMetadata;
@@ -136,6 +137,18 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
136137
}
137138
}
138139

140+
@Override
141+
public void writeBlobWithMetadata(
142+
String blobName,
143+
InputStream inputStream,
144+
long blobSize,
145+
boolean failIfAlreadyExists,
146+
@Nullable Map<String, String> metadata,
147+
@Nullable CryptoMetadata cryptoMetadata
148+
) throws IOException {
149+
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
150+
}
151+
139152
@Override
140153
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
141154
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

Lines changed: 50 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@
6363
import org.apache.logging.log4j.Logger;
6464
import org.apache.logging.log4j.message.ParameterizedMessage;
6565
import org.opensearch.action.support.PlainActionFuture;
66+
import org.opensearch.cluster.metadata.CryptoMetadata;
67+
import org.opensearch.cluster.metadata.KmsCryptoMetadata;
6668
import org.opensearch.common.Nullable;
6769
import org.opensearch.common.SetOnce;
6870
import org.opensearch.common.StreamContext;
@@ -90,6 +92,7 @@
9092
import org.opensearch.repositories.s3.async.SizeBasedBlockingQ;
9193
import org.opensearch.repositories.s3.async.UploadRequest;
9294
import org.opensearch.repositories.s3.utils.HttpRangeUtils;
95+
import org.opensearch.repositories.s3.utils.SseKmsUtil;
9396
import org.opensearch.secure_sm.AccessController;
9497

9598
import java.io.BufferedInputStream;
@@ -192,7 +195,7 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
192195
}
193196

194197
/**
195-
* Write blob with its object metadata.
198+
* Write blob with its object metadata and optional encryption settings.
196199
*/
197200
@ExperimentalApi
198201
@Override
@@ -201,20 +204,52 @@ public void writeBlobWithMetadata(
201204
InputStream inputStream,
202205
long blobSize,
203206
boolean failIfAlreadyExists,
204-
@Nullable Map<String, String> metadata
207+
@Nullable Map<String, String> metadata,
208+
@Nullable CryptoMetadata cryptoMetadata
205209
) throws IOException {
206210
assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests";
207211
AccessController.doPrivilegedChecked(() -> {
208212
if (blobSize <= getLargeBlobThresholdInBytes()) {
209-
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
213+
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata, cryptoMetadata);
210214
} else {
211-
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
215+
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata, cryptoMetadata);
212216
}
213217
});
214218
}
215219

220+
/**
221+
* Write blob with its object metadata.
222+
*/
223+
@ExperimentalApi
224+
@Override
225+
public void writeBlobWithMetadata(
226+
String blobName,
227+
InputStream inputStream,
228+
long blobSize,
229+
boolean failIfAlreadyExists,
230+
@Nullable Map<String, String> metadata
231+
) throws IOException {
232+
// Delegate to crypto-aware version with null CryptoMetadata
233+
writeBlobWithMetadata(blobName, inputStream, blobSize, failIfAlreadyExists, metadata, null);
234+
}
235+
216236
@Override
217237
public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> completionListener) throws IOException {
238+
CryptoMetadata crypto = writeContext.getCryptoMetadata();
239+
240+
String indexKmsKey = null;
241+
String indexEncContext = null;
242+
243+
if (crypto instanceof KmsCryptoMetadata) {
244+
KmsCryptoMetadata kmsMetadata = (KmsCryptoMetadata) crypto;
245+
indexKmsKey = kmsMetadata.getKmsKeyArn().orElse(null);
246+
indexEncContext = kmsMetadata.getKmsEncryptionContext().orElse(null);
247+
}
248+
String mergeEncContext = SseKmsUtil.mergeAndEncodeEncryptionContexts(
249+
indexEncContext,
250+
blobStore.serverSideEncryptionEncryptionContext()
251+
);
252+
218253
UploadRequest uploadRequest = new UploadRequest(
219254
blobStore.bucket(),
220255
buildKey(writeContext.getFileName()),
@@ -226,9 +261,9 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
226261
blobStore.isUploadRetryEnabled(),
227262
writeContext.getMetadata(),
228263
blobStore.serverSideEncryptionType(),
229-
blobStore.serverSideEncryptionKmsKey(),
264+
indexKmsKey != null ? indexKmsKey : blobStore.serverSideEncryptionKmsKey(),
230265
blobStore.serverSideEncryptionBucketKey(),
231-
blobStore.serverSideEncryptionEncryptionContext(),
266+
mergeEncContext,
232267
blobStore.expectedBucketOwner()
233268
);
234269
try {
@@ -250,7 +285,8 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
250285
uploadRequest.getKey(),
251286
inputStream.getInputStream(),
252287
uploadRequest.getContentLength(),
253-
uploadRequest.getMetadata()
288+
uploadRequest.getMetadata(),
289+
crypto
254290
);
255291
completionListener.onResponse(null);
256292
} catch (Exception ex) {
@@ -536,7 +572,8 @@ void executeSingleUpload(
536572
final String blobName,
537573
final InputStream input,
538574
final long blobSize,
539-
final Map<String, String> metadata
575+
final Map<String, String> metadata,
576+
@Nullable CryptoMetadata cryptoMetadata
540577
) throws IOException {
541578

542579
// Extra safety checks
@@ -559,7 +596,8 @@ void executeSingleUpload(
559596
if (CollectionUtils.isNotEmpty(metadata)) {
560597
putObjectRequestBuilder = putObjectRequestBuilder.metadata(metadata);
561598
}
562-
configureEncryptionSettings(putObjectRequestBuilder, blobStore);
599+
600+
configureEncryptionSettings(putObjectRequestBuilder, blobStore, cryptoMetadata);
563601

564602
PutObjectRequest putObjectRequest = putObjectRequestBuilder.build();
565603
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
@@ -585,7 +623,8 @@ void executeMultipartUpload(
585623
final String blobName,
586624
final InputStream input,
587625
final long blobSize,
588-
final Map<String, String> metadata
626+
final Map<String, String> metadata,
627+
@Nullable CryptoMetadata cryptoMetadata
589628
) throws IOException {
590629

591630
ensureMultiPartUploadSize(blobSize);
@@ -616,7 +655,7 @@ void executeMultipartUpload(
616655
createMultipartUploadRequestBuilder.metadata(metadata);
617656
}
618657

619-
configureEncryptionSettings(createMultipartUploadRequestBuilder, blobStore);
658+
configureEncryptionSettings(createMultipartUploadRequestBuilder, blobStore, cryptoMetadata);
620659

621660
final InputStream requestInputStream;
622661
if (blobStore.isUploadRetryEnabled()) {

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/utils/SseKmsUtil.java

Lines changed: 61 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,33 +12,88 @@
1212
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
1313
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
1414

15+
import org.opensearch.cluster.metadata.CryptoMetadata;
16+
import org.opensearch.cluster.metadata.KmsCryptoMetadata;
17+
import org.opensearch.common.Nullable;
1518
import org.opensearch.repositories.s3.S3BlobStore;
1619
import org.opensearch.repositories.s3.async.UploadRequest;
1720

1821
public class SseKmsUtil {
22+
/**
23+
* Merges index-level and repository-level encryption contexts, converts to JSON format if needed,
24+
* and Base64 encodes for S3.
25+
* <p>
26+
* Delegates to centralized EncryptionContextUtils for consistent behavior.
27+
*
28+
* @param indexEncContext Index-level encryption context - can be cryptofs or JSON format
29+
* @param repoEncContext Repository-level encryption context - already Base64 encoded JSON
30+
* @return Base64 encoded merged JSON encryption context, or null if both are null
31+
*/
32+
public static String mergeAndEncodeEncryptionContexts(@Nullable String indexEncContext, @Nullable String repoEncContext) {
33+
return org.opensearch.repositories.blobstore.EncryptionContextUtils.mergeAndEncodeEncryptionContexts(
34+
indexEncContext,
35+
repoEncContext
36+
);
37+
}
1938

20-
public static void configureEncryptionSettings(CreateMultipartUploadRequest.Builder builder, S3BlobStore blobStore) {
39+
public static void configureEncryptionSettings(
40+
CreateMultipartUploadRequest.Builder builder,
41+
S3BlobStore blobStore,
42+
@Nullable CryptoMetadata cryptoMetadata
43+
) {
2144
if (blobStore.serverSideEncryptionType().equals(ServerSideEncryption.AES256.toString())) {
2245
builder.serverSideEncryption(ServerSideEncryption.AES256);
2346
} else if (blobStore.serverSideEncryptionType().equals(ServerSideEncryption.AWS_KMS.toString())) {
47+
String indexKmsKey = null;
48+
String indexEncContext = null;
49+
50+
if (cryptoMetadata instanceof KmsCryptoMetadata) {
51+
KmsCryptoMetadata kmsMetadata = (KmsCryptoMetadata) cryptoMetadata;
52+
indexKmsKey = kmsMetadata.getKmsKeyArn().orElse(null);
53+
indexEncContext = kmsMetadata.getKmsEncryptionContext().orElse(null);
54+
}
55+
56+
String kmsKey = (indexKmsKey != null) ? indexKmsKey : blobStore.serverSideEncryptionKmsKey();
57+
String encContext = mergeAndEncodeEncryptionContexts(indexEncContext, blobStore.serverSideEncryptionEncryptionContext());
58+
2459
builder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
25-
builder.ssekmsKeyId(blobStore.serverSideEncryptionKmsKey());
60+
builder.ssekmsKeyId(kmsKey);
2661
builder.bucketKeyEnabled(blobStore.serverSideEncryptionBucketKey());
27-
builder.ssekmsEncryptionContext(blobStore.serverSideEncryptionEncryptionContext());
62+
builder.ssekmsEncryptionContext(encContext);
2863
}
2964
}
3065

31-
public static void configureEncryptionSettings(PutObjectRequest.Builder builder, S3BlobStore blobStore) {
66+
public static void configureEncryptionSettings(
67+
PutObjectRequest.Builder builder,
68+
S3BlobStore blobStore,
69+
@Nullable CryptoMetadata cryptoMetadata
70+
) {
3271
if (blobStore.serverSideEncryptionType().equals(ServerSideEncryption.AES256.toString())) {
3372
builder.serverSideEncryption(ServerSideEncryption.AES256);
3473
} else if (blobStore.serverSideEncryptionType().equals(ServerSideEncryption.AWS_KMS.toString())) {
74+
String indexKmsKey = null;
75+
String indexEncContext = null;
76+
77+
if (cryptoMetadata instanceof KmsCryptoMetadata) {
78+
KmsCryptoMetadata kmsMetadata = (KmsCryptoMetadata) cryptoMetadata;
79+
indexKmsKey = kmsMetadata.getKmsKeyArn().orElse(null);
80+
indexEncContext = kmsMetadata.getKmsEncryptionContext().orElse(null);
81+
}
82+
83+
String kmsKey = (indexKmsKey != null) ? indexKmsKey : blobStore.serverSideEncryptionKmsKey();
84+
String encContext = mergeAndEncodeEncryptionContexts(indexEncContext, blobStore.serverSideEncryptionEncryptionContext());
85+
3586
builder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
36-
builder.ssekmsKeyId(blobStore.serverSideEncryptionKmsKey());
87+
builder.ssekmsKeyId(kmsKey);
3788
builder.bucketKeyEnabled(blobStore.serverSideEncryptionBucketKey());
38-
builder.ssekmsEncryptionContext(blobStore.serverSideEncryptionEncryptionContext());
89+
builder.ssekmsEncryptionContext(encContext);
3990
}
4091
}
4192

93+
public static void configureEncryptionSettings(PutObjectRequest.Builder builder, S3BlobStore blobStore) {
94+
configureEncryptionSettings(builder, blobStore, null);
95+
}
96+
4297
public static void configureEncryptionSettings(CreateMultipartUploadRequest.Builder builder, UploadRequest uploadRequest) {
4398
if (uploadRequest.getServerSideEncryptionType().equals(ServerSideEncryption.AES256.toString())) {
4499
builder.serverSideEncryption(ServerSideEncryption.AES256);
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.repositories.s3;
10+
11+
import org.opensearch.cluster.metadata.CryptoMetadata;
12+
import org.opensearch.common.settings.Settings;
13+
import org.opensearch.repositories.s3.utils.SseKmsUtil;
14+
import org.opensearch.test.OpenSearchTestCase;
15+
16+
import java.nio.charset.StandardCharsets;
17+
import java.util.Base64;
18+
19+
public class S3BlobContainerEncryptionTests extends OpenSearchTestCase {
20+
21+
public void testEncryptionContextMerging() {
22+
String indexContext = "tenant=acme,classification=confidential";
23+
String repoContext = Base64.getEncoder().encodeToString("{\"repo\":\"test\",\"env\":\"prod\"}".getBytes(StandardCharsets.UTF_8));
24+
25+
String merged = SseKmsUtil.mergeAndEncodeEncryptionContexts(indexContext, repoContext);
26+
assertNotNull(merged);
27+
28+
String decoded = new String(Base64.getDecoder().decode(merged), StandardCharsets.UTF_8);
29+
assertTrue(decoded.contains("\"tenant\":\"acme\""));
30+
assertTrue(decoded.contains("\"classification\":\"confidential\""));
31+
assertTrue(decoded.contains("\"repo\":\"test\""));
32+
assertTrue(decoded.contains("\"env\":\"prod\""));
33+
}
34+
35+
public void testCryptoMetadataExtraction() {
36+
Settings cryptoSettings = Settings.builder()
37+
.put("kms.key_arn", "arn:aws:kms:us-east-1:123456789:key/index-key")
38+
.put("kms.encryption_context", "tenant=acme,env=staging")
39+
.build();
40+
CryptoMetadata cryptoMetadata = new CryptoMetadata("index-provider", "aws-kms", cryptoSettings);
41+
42+
assertEquals("arn:aws:kms:us-east-1:123456789:key/index-key", cryptoMetadata.settings().get("kms.key_arn"));
43+
assertEquals("tenant=acme,env=staging", cryptoMetadata.settings().get("kms.encryption_context"));
44+
}
45+
}

plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -757,7 +757,8 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException, W
757757
anyString(),
758758
any(InputStream.class),
759759
anyLong(),
760-
anyMap()
760+
anyMap(),
761+
null
761762
);
762763

763764
if (expectException) {

plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public void testExecuteSingleUploadBlobSizeTooLarge() {
134134

135135
final IllegalArgumentException e = expectThrows(
136136
IllegalArgumentException.class,
137-
() -> blobContainer.executeSingleUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize, null)
137+
() -> blobContainer.executeSingleUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize, null, null)
138138
);
139139
assertEquals("Upload request size [" + blobSize + "] can't be larger than 5gb", e.getMessage());
140140
}
@@ -153,6 +153,7 @@ public void testExecuteSingleUploadBlobSizeLargerThanBufferSize() {
153153
blobName,
154154
new ByteArrayInputStream(new byte[0]),
155155
ByteSizeUnit.MB.toBytes(2),
156+
null,
156157
null
157158
)
158159
);
@@ -665,7 +666,7 @@ public void testExecuteSingleUpload() throws IOException {
665666
when(client.putObject(putReqCaptor.capture(), bodyCaptor.capture())).thenReturn(PutObjectResponse.builder().build());
666667

667668
// Pass the known-length stream + tell the code the exact size
668-
blobContainer.executeSingleUpload(blobStore, blobName, inputStream, blobSize, metadata);
669+
blobContainer.executeSingleUpload(blobStore, blobName, inputStream, blobSize, metadata, null);
669670

670671
final PutObjectRequest request = putReqCaptor.getValue();
671672
final RequestBody requestBody = bodyCaptor.getValue();
@@ -703,7 +704,7 @@ public void testExecuteMultipartUploadBlobSizeTooLarge() {
703704

704705
final IllegalArgumentException e = expectThrows(
705706
IllegalArgumentException.class,
706-
() -> blobContainer.executeMultipartUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize, null)
707+
() -> blobContainer.executeMultipartUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize, null, null)
707708
);
708709
assertEquals("Multipart upload request size [" + blobSize + "] can't be larger than 5tb", e.getMessage());
709710
}
@@ -715,7 +716,7 @@ public void testExecuteMultipartUploadBlobSizeTooSmall() {
715716

716717
final IllegalArgumentException e = expectThrows(
717718
IllegalArgumentException.class,
718-
() -> blobContainer.executeMultipartUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize, null)
719+
() -> blobContainer.executeMultipartUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize, null, null)
719720
);
720721
assertEquals("Multipart upload request size [" + blobSize + "] can't be smaller than 5mb", e.getMessage());
721722
}
@@ -803,7 +804,7 @@ public void testExecuteMultipartUpload() throws IOException {
803804

804805
final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[0]);
805806
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
806-
blobContainer.executeMultipartUpload(blobStore, blobName, inputStream, blobSize, metadata);
807+
blobContainer.executeMultipartUpload(blobStore, blobName, inputStream, blobSize, metadata, null);
807808

808809
final CreateMultipartUploadRequest initRequest = createMultipartUploadRequestArgumentCaptor.getValue();
809810
assertEquals(bucketName, initRequest.bucket());
@@ -933,7 +934,7 @@ public void testExecuteMultipartUploadAborted() {
933934

934935
final IOException e = expectThrows(IOException.class, () -> {
935936
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
936-
blobContainer.executeMultipartUpload(blobStore, blobName, new ByteArrayInputStream(new byte[0]), blobSize, null);
937+
blobContainer.executeMultipartUpload(blobStore, blobName, new ByteArrayInputStream(new byte[0]), blobSize, null, null);
937938
});
938939

939940
assertEquals("Unable to upload object [" + blobName + "] using multipart upload", e.getMessage());

0 commit comments

Comments
 (0)