Skip to content

Commit efd5dee

Browse files
author
Uday Bhaskar
committed
add index-level-encryption support for snapshots and remote-store
Signed-off-by: Uday Bhaskar <[email protected]>
1 parent 94fdafc commit efd5dee

File tree

39 files changed

+1647
-147
lines changed

39 files changed

+1647
-147
lines changed

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

Lines changed: 64 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232

3333
package org.opensearch.repositories.s3;
3434

35+
import org.opensearch.cluster.metadata.CryptoMetadata;
36+
import org.opensearch.repositories.s3.utils.SseKmsUtil;
3537
import software.amazon.awssdk.core.ResponseInputStream;
3638
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
3739
import software.amazon.awssdk.core.exception.SdkException;
@@ -191,7 +193,7 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
191193
}
192194

193195
/**
194-
* Write blob with its object metadata.
196+
* Write blob with its object metadata and optional encryption settings.
195197
*/
196198
@ExperimentalApi
197199
@Override
@@ -200,21 +202,66 @@ public void writeBlobWithMetadata(
200202
InputStream inputStream,
201203
long blobSize,
202204
boolean failIfAlreadyExists,
203-
@Nullable Map<String, String> metadata
205+
@Nullable Map<String, String> metadata,
206+
@Nullable CryptoMetadata cryptoMetadata
204207
) throws IOException {
208+
final String indexKmsKey;
209+
final String indexEncContext;
210+
211+
if (cryptoMetadata != null && "aws-kms".equals(cryptoMetadata.keyProviderType())) {
212+
// Extract index-level SSE-KMS key and context from CryptoMetadata.settings()
213+
// Note: Index stores as "index.store.crypto.kms.*" → becomes "kms.*" in settings
214+
indexKmsKey = cryptoMetadata.settings().get("kms.key_arn");
215+
indexEncContext = cryptoMetadata.settings().get("kms.encryption_context");
216+
} else {
217+
indexKmsKey = null;
218+
indexEncContext = null;
219+
}
220+
205221
assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests";
206222
SocketAccess.doPrivilegedIOException(() -> {
207223
if (blobSize <= getLargeBlobThresholdInBytes()) {
208-
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
224+
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata, indexKmsKey, indexEncContext);
209225
} else {
210-
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
226+
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata, indexKmsKey, indexEncContext);
211227
}
212228
return null;
213229
});
214230
}
215231

232+
/**
233+
* Write blob with its object metadata.
234+
*/
235+
@ExperimentalApi
236+
@Override
237+
public void writeBlobWithMetadata(
238+
String blobName,
239+
InputStream inputStream,
240+
long blobSize,
241+
boolean failIfAlreadyExists,
242+
@Nullable Map<String, String> metadata
243+
) throws IOException {
244+
// Delegate to crypto-aware version with null CryptoMetadata
245+
writeBlobWithMetadata(blobName, inputStream, blobSize, failIfAlreadyExists, metadata, null);
246+
}
247+
216248
@Override
217249
public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> completionListener) throws IOException {
250+
// Extract index-level SSE-KMS settings from WriteContext CryptoMetadata
251+
String indexKmsKey = null;
252+
String indexEncContext = null;
253+
254+
CryptoMetadata crypto = writeContext.getCryptoMetadata();
255+
if (crypto != null && "aws-kms".equals(crypto.keyProviderType())) {
256+
indexKmsKey = crypto.settings().get("kms.key_arn");
257+
indexEncContext = crypto.settings().get("kms.encryption_context");
258+
}
259+
260+
// MERGE encryption contexts
261+
String mergeEncContext = SseKmsUtil.mergeAndEncodeEncryptionContexts(indexEncContext,
262+
blobStore.serverSideEncryptionEncryptionContext()
263+
);
264+
218265
UploadRequest uploadRequest = new UploadRequest(
219266
blobStore.bucket(),
220267
buildKey(writeContext.getFileName()),
@@ -226,9 +273,9 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
226273
blobStore.isUploadRetryEnabled(),
227274
writeContext.getMetadata(),
228275
blobStore.serverSideEncryptionType(),
229-
blobStore.serverSideEncryptionKmsKey(),
276+
indexKmsKey != null ? indexKmsKey : blobStore.serverSideEncryptionKmsKey(),
230277
blobStore.serverSideEncryptionBucketKey(),
231-
blobStore.serverSideEncryptionEncryptionContext(),
278+
mergeEncContext,
232279
blobStore.expectedBucketOwner()
233280
);
234281
try {
@@ -250,7 +297,9 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
250297
uploadRequest.getKey(),
251298
inputStream.getInputStream(),
252299
uploadRequest.getContentLength(),
253-
uploadRequest.getMetadata()
300+
uploadRequest.getMetadata(),
301+
indexKmsKey,
302+
indexEncContext
254303
);
255304
completionListener.onResponse(null);
256305
} catch (Exception ex) {
@@ -536,8 +585,9 @@ void executeSingleUpload(
536585
final String blobName,
537586
final InputStream input,
538587
final long blobSize,
539-
final Map<String, String> metadata
540-
) throws IOException {
588+
final Map<String, String> metadata,
589+
String indexKmsKey,
590+
String indexEncContext) throws IOException {
541591

542592
// Extra safety checks
543593
if (blobSize > MAX_FILE_SIZE.getBytes()) {
@@ -559,7 +609,8 @@ void executeSingleUpload(
559609
if (CollectionUtils.isNotEmpty(metadata)) {
560610
putObjectRequestBuilder = putObjectRequestBuilder.metadata(metadata);
561611
}
562-
configureEncryptionSettings(putObjectRequestBuilder, blobStore);
612+
613+
configureEncryptionSettings(putObjectRequestBuilder, blobStore, indexKmsKey, indexEncContext);
563614

564615
PutObjectRequest putObjectRequest = putObjectRequestBuilder.build();
565616
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
@@ -585,8 +636,8 @@ void executeMultipartUpload(
585636
final String blobName,
586637
final InputStream input,
587638
final long blobSize,
588-
final Map<String, String> metadata
589-
) throws IOException {
639+
final Map<String, String> metadata,
640+
String indexKmsKey, String indexEncContext) throws IOException {
590641

591642
ensureMultiPartUploadSize(blobSize);
592643
final long partSize = blobStore.bufferSizeInBytes();
@@ -616,7 +667,7 @@ void executeMultipartUpload(
616667
createMultipartUploadRequestBuilder.metadata(metadata);
617668
}
618669

619-
configureEncryptionSettings(createMultipartUploadRequestBuilder, blobStore);
670+
configureEncryptionSettings(createMultipartUploadRequestBuilder, blobStore, indexKmsKey, indexEncContext);
620671

621672
final InputStream requestInputStream;
622673
if (blobStore.isUploadRetryEnabled()) {
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package org.opensearch.repositories.s3.utils;
2+
3+
public class EncryptionContextUtils {
4+
}

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/utils/SseKmsUtil.java

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.repositories.s3.utils;
1010

11+
import org.opensearch.common.Nullable;
1112
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
1213
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
1314
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
@@ -16,29 +17,84 @@
1617
import org.opensearch.repositories.s3.async.UploadRequest;
1718

1819
public class SseKmsUtil {
20+
/**
21+
* Merges index-level and repository-level encryption contexts, converts to JSON format if needed,
22+
* and Base64 encodes for S3.
23+
* <p>
24+
* Delegates to centralized EncryptionContextUtils for consistent behavior.
25+
*
26+
* @param indexEncContext Index-level encryption context - can be cryptofs or JSON format
27+
* @param repoEncContext Repository-level encryption context - already Base64 encoded JSON
28+
* @return Base64 encoded merged JSON encryption context, or null if both are null
29+
*/
30+
public static String mergeAndEncodeEncryptionContexts(
31+
@Nullable String indexEncContext,
32+
@Nullable String repoEncContext
33+
) {
34+
return org.opensearch.repositories.blobstore.EncryptionContextUtils.mergeAndEncodeEncryptionContexts(
35+
indexEncContext,
36+
repoEncContext
37+
);
38+
}
1939

20-
public static void configureEncryptionSettings(CreateMultipartUploadRequest.Builder builder, S3BlobStore blobStore) {
40+
// CreateMultipartUploadRequest with S3BlobStore (with index override)
41+
public static void configureEncryptionSettings(
42+
CreateMultipartUploadRequest.Builder builder,
43+
S3BlobStore blobStore,
44+
@Nullable String indexKmsKey,
45+
@Nullable String indexEncContext
46+
) {
2147
if (blobStore.serverSideEncryptionType().equals(ServerSideEncryption.AES256.toString())) {
2248
builder.serverSideEncryption(ServerSideEncryption.AES256);
2349
} else if (blobStore.serverSideEncryptionType().equals(ServerSideEncryption.AWS_KMS.toString())) {
50+
// Priority: index key → repo key
51+
String kmsKey = (indexKmsKey != null) ? indexKmsKey : blobStore.serverSideEncryptionKmsKey();
52+
53+
// MERGE encryption contexts: EncA (index) + EncB (repository)
54+
// This ensures KMS grants work correctly with combined context
55+
String encContext = mergeAndEncodeEncryptionContexts(
56+
indexEncContext,
57+
blobStore.serverSideEncryptionEncryptionContext()
58+
);
59+
2460
builder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
25-
builder.ssekmsKeyId(blobStore.serverSideEncryptionKmsKey());
61+
builder.ssekmsKeyId(kmsKey);
2662
builder.bucketKeyEnabled(blobStore.serverSideEncryptionBucketKey());
27-
builder.ssekmsEncryptionContext(blobStore.serverSideEncryptionEncryptionContext());
63+
builder.ssekmsEncryptionContext(encContext);
2864
}
2965
}
3066

31-
public static void configureEncryptionSettings(PutObjectRequest.Builder builder, S3BlobStore blobStore) {
67+
// PutObjectRequest with S3BlobStore (with index override)
68+
public static void configureEncryptionSettings(
69+
PutObjectRequest.Builder builder,
70+
S3BlobStore blobStore,
71+
@Nullable String indexKmsKey,
72+
@Nullable String indexEncContext
73+
) {
3274
if (blobStore.serverSideEncryptionType().equals(ServerSideEncryption.AES256.toString())) {
3375
builder.serverSideEncryption(ServerSideEncryption.AES256);
3476
} else if (blobStore.serverSideEncryptionType().equals(ServerSideEncryption.AWS_KMS.toString())) {
77+
// Priority: index key > repo key
78+
String kmsKey = (indexKmsKey != null) ? indexKmsKey : blobStore.serverSideEncryptionKmsKey();
79+
80+
// MERGE encryption contexts: index + repository
81+
// This ensures KMS grants work correctly with combined context
82+
String encContext = mergeAndEncodeEncryptionContexts(
83+
indexEncContext,
84+
blobStore.serverSideEncryptionEncryptionContext()
85+
);
86+
3587
builder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
36-
builder.ssekmsKeyId(blobStore.serverSideEncryptionKmsKey());
88+
builder.ssekmsKeyId(kmsKey);
3789
builder.bucketKeyEnabled(blobStore.serverSideEncryptionBucketKey());
38-
builder.ssekmsEncryptionContext(blobStore.serverSideEncryptionEncryptionContext());
90+
builder.ssekmsEncryptionContext(encContext);
3991
}
4092
}
4193

94+
public static void configureEncryptionSettings(PutObjectRequest.Builder builder, S3BlobStore blobStore) {
95+
configureEncryptionSettings(builder, blobStore, null, null);
96+
}
97+
4298
public static void configureEncryptionSettings(CreateMultipartUploadRequest.Builder builder, UploadRequest uploadRequest) {
4399
if (uploadRequest.getServerSideEncryptionType().equals(ServerSideEncryption.AES256.toString())) {
44100
builder.serverSideEncryption(ServerSideEncryption.AES256);
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.repositories.s3;
10+
11+
import org.opensearch.cluster.metadata.CryptoMetadata;
12+
import org.opensearch.common.settings.Settings;
13+
import org.opensearch.repositories.s3.utils.SseKmsUtil;
14+
import org.opensearch.test.OpenSearchTestCase;
15+
16+
import java.util.Base64;
17+
18+
public class S3BlobContainerEncryptionTests extends OpenSearchTestCase {
19+
20+
public void testEncryptionContextMerging() {
21+
String indexContext = "tenant=acme,classification=confidential";
22+
String repoContext = Base64.getEncoder().encodeToString("{\"repo\":\"test\",\"env\":\"prod\"}".getBytes());
23+
24+
String merged = SseKmsUtil.mergeAndEncodeEncryptionContexts(indexContext, repoContext);
25+
assertNotNull(merged);
26+
27+
String decoded = new String(Base64.getDecoder().decode(merged));
28+
assertTrue(decoded.contains("\"tenant\":\"acme\""));
29+
assertTrue(decoded.contains("\"classification\":\"confidential\""));
30+
assertTrue(decoded.contains("\"repo\":\"test\""));
31+
assertTrue(decoded.contains("\"env\":\"prod\""));
32+
}
33+
34+
public void testCryptoMetadataExtraction() {
35+
Settings cryptoSettings = Settings.builder()
36+
.put("kms.key_arn", "arn:aws:kms:us-east-1:123456789:key/index-key")
37+
.put("kms.encryption_context", "tenant=acme,env=staging")
38+
.build();
39+
CryptoMetadata cryptoMetadata = new CryptoMetadata("index-provider", "aws-kms", cryptoSettings);
40+
41+
assertEquals("arn:aws:kms:us-east-1:123456789:key/index-key", cryptoMetadata.settings().get("kms.key_arn"));
42+
assertEquals("tenant=acme,env=staging", cryptoMetadata.settings().get("kms.encryption_context"));
43+
}
44+
}

plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -757,8 +757,8 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException, W
757757
anyString(),
758758
any(InputStream.class),
759759
anyLong(),
760-
anyMap()
761-
);
760+
anyMap(),
761+
null, null);
762762

763763
if (expectException) {
764764
verify(client, times(1)).abortMultipartUpload(any(AbortMultipartUploadRequest.class));

plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public void testExecuteSingleUploadBlobSizeTooLarge() {
134134

135135
final IllegalArgumentException e = expectThrows(
136136
IllegalArgumentException.class,
137-
() -> blobContainer.executeSingleUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize, null)
137+
() -> blobContainer.executeSingleUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize, null, null, null)
138138
);
139139
assertEquals("Upload request size [" + blobSize + "] can't be larger than 5gb", e.getMessage());
140140
}
@@ -153,8 +153,8 @@ public void testExecuteSingleUploadBlobSizeLargerThanBufferSize() {
153153
blobName,
154154
new ByteArrayInputStream(new byte[0]),
155155
ByteSizeUnit.MB.toBytes(2),
156-
null
157-
)
156+
null,
157+
null, null)
158158
);
159159
assertEquals("Upload request size [2097152] can't be larger than buffer size", e.getMessage());
160160
}
@@ -665,7 +665,7 @@ public void testExecuteSingleUpload() throws IOException {
665665
when(client.putObject(putReqCaptor.capture(), bodyCaptor.capture())).thenReturn(PutObjectResponse.builder().build());
666666

667667
// Pass the known-length stream + tell the code the exact size
668-
blobContainer.executeSingleUpload(blobStore, blobName, inputStream, blobSize, metadata);
668+
blobContainer.executeSingleUpload(blobStore, blobName, inputStream, blobSize, metadata, null, null);
669669

670670
final PutObjectRequest request = putReqCaptor.getValue();
671671
final RequestBody requestBody = bodyCaptor.getValue();
@@ -703,7 +703,7 @@ public void testExecuteMultipartUploadBlobSizeTooLarge() {
703703

704704
final IllegalArgumentException e = expectThrows(
705705
IllegalArgumentException.class,
706-
() -> blobContainer.executeMultipartUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize, null)
706+
() -> blobContainer.executeMultipartUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize, null, null, null)
707707
);
708708
assertEquals("Multipart upload request size [" + blobSize + "] can't be larger than 5tb", e.getMessage());
709709
}
@@ -715,7 +715,7 @@ public void testExecuteMultipartUploadBlobSizeTooSmall() {
715715

716716
final IllegalArgumentException e = expectThrows(
717717
IllegalArgumentException.class,
718-
() -> blobContainer.executeMultipartUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize, null)
718+
() -> blobContainer.executeMultipartUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize, null, null, null)
719719
);
720720
assertEquals("Multipart upload request size [" + blobSize + "] can't be smaller than 5mb", e.getMessage());
721721
}
@@ -803,7 +803,7 @@ public void testExecuteMultipartUpload() throws IOException {
803803

804804
final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[0]);
805805
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
806-
blobContainer.executeMultipartUpload(blobStore, blobName, inputStream, blobSize, metadata);
806+
blobContainer.executeMultipartUpload(blobStore, blobName, inputStream, blobSize, metadata, null, null);
807807

808808
final CreateMultipartUploadRequest initRequest = createMultipartUploadRequestArgumentCaptor.getValue();
809809
assertEquals(bucketName, initRequest.bucket());
@@ -933,7 +933,7 @@ public void testExecuteMultipartUploadAborted() {
933933

934934
final IOException e = expectThrows(IOException.class, () -> {
935935
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
936-
blobContainer.executeMultipartUpload(blobStore, blobName, new ByteArrayInputStream(new byte[0]), blobSize, null);
936+
blobContainer.executeMultipartUpload(blobStore, blobName, new ByteArrayInputStream(new byte[0]), blobSize, null, null, null);
937937
});
938938

939939
assertEquals("Unable to upload object [" + blobName + "] using multipart upload", e.getMessage());

0 commit comments

Comments
 (0)