Skip to content

Commit 13a2cd9

Browse files
author
Uday Bhaskar
committed
add index-level-encryption support for snapshots and remote-store
Signed-off-by: Uday Bhaskar <[email protected]>
1 parent bd24685 commit 13a2cd9

File tree

44 files changed

+1941
-157
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+1941
-157
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3333
- Allow the truncate filter in normalizers ([#19778](https://github.com/opensearch-project/OpenSearch/issues/19778))
3434
- Support pull-based ingestion message mappers and raw payload support ([#19765](https://github.com/opensearch-project/OpenSearch/pull/19765))
3535
- Support dynamic consumer configuration update in pull-based ingestion ([#19963](https://github.com/opensearch-project/OpenSearch/pull/19963))
36+
- Add index-level-encryption support for snapshots and remote-store ([#20095](https://github.com/opensearch-project/OpenSearch/pull/20095))
3637

3738
### Changed
3839
- Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350))

plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureBlobContainer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.opensearch.action.ActionRunnable;
4141
import org.opensearch.action.support.GroupedActionListener;
4242
import org.opensearch.action.support.PlainActionFuture;
43+
import org.opensearch.cluster.metadata.CryptoMetadata;
4344
import org.opensearch.common.Nullable;
4445
import org.opensearch.common.blobstore.BlobContainer;
4546
import org.opensearch.common.blobstore.BlobMetadata;
@@ -136,6 +137,18 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
136137
}
137138
}
138139

140+
@Override
141+
public void writeBlobWithMetadata(
142+
String blobName,
143+
InputStream inputStream,
144+
long blobSize,
145+
boolean failIfAlreadyExists,
146+
@Nullable Map<String, String> metadata,
147+
@Nullable CryptoMetadata cryptoMetadata
148+
) throws IOException {
149+
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
150+
}
151+
139152
@Override
140153
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
141154
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

Lines changed: 55 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232

3333
package org.opensearch.repositories.s3;
3434

35+
import org.opensearch.cluster.metadata.CryptoMetadata;
36+
import org.opensearch.cluster.metadata.KmsCryptoMetadata;
37+
import org.opensearch.repositories.s3.utils.SseKmsUtil;
3538
import software.amazon.awssdk.core.ResponseInputStream;
3639
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
3740
import software.amazon.awssdk.core.exception.SdkException;
@@ -192,7 +195,7 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
192195
}
193196

194197
/**
195-
* Write blob with its object metadata.
198+
* Write blob with its object metadata and optional encryption settings.
196199
*/
197200
@ExperimentalApi
198201
@Override
@@ -201,20 +204,51 @@ public void writeBlobWithMetadata(
201204
InputStream inputStream,
202205
long blobSize,
203206
boolean failIfAlreadyExists,
204-
@Nullable Map<String, String> metadata
207+
@Nullable Map<String, String> metadata,
208+
@Nullable CryptoMetadata cryptoMetadata
205209
) throws IOException {
206210
assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests";
207211
AccessController.doPrivilegedChecked(() -> {
208212
if (blobSize <= getLargeBlobThresholdInBytes()) {
209-
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
213+
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata, cryptoMetadata);
210214
} else {
211-
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
215+
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata, cryptoMetadata);
212216
}
213217
});
214218
}
215219

220+
/**
221+
* Write blob with its object metadata.
222+
*/
223+
@ExperimentalApi
224+
@Override
225+
public void writeBlobWithMetadata(
226+
String blobName,
227+
InputStream inputStream,
228+
long blobSize,
229+
boolean failIfAlreadyExists,
230+
@Nullable Map<String, String> metadata
231+
) throws IOException {
232+
// Delegate to crypto-aware version with null CryptoMetadata
233+
writeBlobWithMetadata(blobName, inputStream, blobSize, failIfAlreadyExists, metadata, null);
234+
}
235+
216236
@Override
217237
public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> completionListener) throws IOException {
238+
CryptoMetadata crypto = writeContext.getCryptoMetadata();
239+
240+
String indexKmsKey = null;
241+
String mergeEncContext = null;
242+
243+
if (crypto instanceof KmsCryptoMetadata) {
244+
KmsCryptoMetadata kmsMetadata = (KmsCryptoMetadata) crypto;
245+
indexKmsKey = kmsMetadata.getKmsKeyArn().orElse(null);
246+
mergeEncContext = SseKmsUtil.mergeAndEncodeEncryptionContexts(
247+
kmsMetadata.getKmsEncryptionContext().orElse(null),
248+
blobStore.serverSideEncryptionEncryptionContext()
249+
);
250+
}
251+
218252
UploadRequest uploadRequest = new UploadRequest(
219253
blobStore.bucket(),
220254
buildKey(writeContext.getFileName()),
@@ -226,9 +260,9 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
226260
blobStore.isUploadRetryEnabled(),
227261
writeContext.getMetadata(),
228262
blobStore.serverSideEncryptionType(),
229-
blobStore.serverSideEncryptionKmsKey(),
263+
indexKmsKey != null ? indexKmsKey : blobStore.serverSideEncryptionKmsKey(),
230264
blobStore.serverSideEncryptionBucketKey(),
231-
blobStore.serverSideEncryptionEncryptionContext(),
265+
mergeEncContext,
232266
blobStore.expectedBucketOwner()
233267
);
234268
try {
@@ -245,13 +279,14 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
245279
);
246280
InputStreamContainer inputStream = streamContext.provideStream(0);
247281
try {
248-
executeMultipartUpload(
249-
blobStore,
250-
uploadRequest.getKey(),
251-
inputStream.getInputStream(),
252-
uploadRequest.getContentLength(),
253-
uploadRequest.getMetadata()
254-
);
282+
executeMultipartUpload(
283+
blobStore,
284+
uploadRequest.getKey(),
285+
inputStream.getInputStream(),
286+
uploadRequest.getContentLength(),
287+
uploadRequest.getMetadata(),
288+
crypto
289+
);
255290
completionListener.onResponse(null);
256291
} catch (Exception ex) {
257292
logger.error(
@@ -536,8 +571,8 @@ void executeSingleUpload(
536571
final String blobName,
537572
final InputStream input,
538573
final long blobSize,
539-
final Map<String, String> metadata
540-
) throws IOException {
574+
final Map<String, String> metadata,
575+
@Nullable CryptoMetadata cryptoMetadata) throws IOException {
541576

542577
// Extra safety checks
543578
if (blobSize > MAX_FILE_SIZE.getBytes()) {
@@ -559,7 +594,8 @@ void executeSingleUpload(
559594
if (CollectionUtils.isNotEmpty(metadata)) {
560595
putObjectRequestBuilder = putObjectRequestBuilder.metadata(metadata);
561596
}
562-
configureEncryptionSettings(putObjectRequestBuilder, blobStore);
597+
598+
configureEncryptionSettings(putObjectRequestBuilder, blobStore, cryptoMetadata);
563599

564600
PutObjectRequest putObjectRequest = putObjectRequestBuilder.build();
565601
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
@@ -585,8 +621,8 @@ void executeMultipartUpload(
585621
final String blobName,
586622
final InputStream input,
587623
final long blobSize,
588-
final Map<String, String> metadata
589-
) throws IOException {
624+
final Map<String, String> metadata,
625+
@Nullable CryptoMetadata cryptoMetadata) throws IOException {
590626

591627
ensureMultiPartUploadSize(blobSize);
592628
final long partSize = blobStore.bufferSizeInBytes();
@@ -616,7 +652,7 @@ void executeMultipartUpload(
616652
createMultipartUploadRequestBuilder.metadata(metadata);
617653
}
618654

619-
configureEncryptionSettings(createMultipartUploadRequestBuilder, blobStore);
655+
configureEncryptionSettings(createMultipartUploadRequestBuilder, blobStore, cryptoMetadata);
620656

621657
final InputStream requestInputStream;
622658
if (blobStore.isUploadRetryEnabled()) {

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/utils/SseKmsUtil.java

Lines changed: 70 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88

99
package org.opensearch.repositories.s3.utils;
1010

11+
import org.opensearch.cluster.metadata.CryptoMetadata;
12+
import org.opensearch.cluster.metadata.KmsCryptoMetadata;
13+
import org.opensearch.common.Nullable;
1114
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
1215
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
1316
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
@@ -16,29 +19,90 @@
1619
import org.opensearch.repositories.s3.async.UploadRequest;
1720

1821
public class SseKmsUtil {
22+
/**
23+
* Merges index-level and repository-level encryption contexts, converts to JSON format if needed,
24+
* and Base64 encodes for S3.
25+
* <p>
26+
* Delegates to centralized EncryptionContextUtils for consistent behavior.
27+
*
28+
* @param indexEncContext Index-level encryption context - can be cryptofs or JSON format
29+
* @param repoEncContext Repository-level encryption context - already Base64 encoded JSON
30+
* @return Base64 encoded merged JSON encryption context, or null if both are null
31+
*/
32+
public static String mergeAndEncodeEncryptionContexts(
33+
@Nullable String indexEncContext,
34+
@Nullable String repoEncContext
35+
) {
36+
return org.opensearch.repositories.blobstore.EncryptionContextUtils.mergeAndEncodeEncryptionContexts(
37+
indexEncContext,
38+
repoEncContext
39+
);
40+
}
1941

20-
public static void configureEncryptionSettings(CreateMultipartUploadRequest.Builder builder, S3BlobStore blobStore) {
42+
public static void configureEncryptionSettings(
43+
CreateMultipartUploadRequest.Builder builder,
44+
S3BlobStore blobStore,
45+
@Nullable CryptoMetadata cryptoMetadata
46+
) {
2147
if (blobStore.serverSideEncryptionType().equals(ServerSideEncryption.AES256.toString())) {
2248
builder.serverSideEncryption(ServerSideEncryption.AES256);
2349
} else if (blobStore.serverSideEncryptionType().equals(ServerSideEncryption.AWS_KMS.toString())) {
50+
String indexKmsKey = null;
51+
String indexEncContext = null;
52+
53+
if (cryptoMetadata instanceof KmsCryptoMetadata) {
54+
KmsCryptoMetadata kmsMetadata = (KmsCryptoMetadata) cryptoMetadata;
55+
indexKmsKey = kmsMetadata.getKmsKeyArn().orElse(null);
56+
indexEncContext = kmsMetadata.getKmsEncryptionContext().orElse(null);
57+
}
58+
59+
String kmsKey = (indexKmsKey != null) ? indexKmsKey : blobStore.serverSideEncryptionKmsKey();
60+
String encContext = mergeAndEncodeEncryptionContexts(
61+
indexEncContext,
62+
blobStore.serverSideEncryptionEncryptionContext()
63+
);
64+
2465
builder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
25-
builder.ssekmsKeyId(blobStore.serverSideEncryptionKmsKey());
66+
builder.ssekmsKeyId(kmsKey);
2667
builder.bucketKeyEnabled(blobStore.serverSideEncryptionBucketKey());
27-
builder.ssekmsEncryptionContext(blobStore.serverSideEncryptionEncryptionContext());
68+
builder.ssekmsEncryptionContext(encContext);
2869
}
2970
}
3071

31-
public static void configureEncryptionSettings(PutObjectRequest.Builder builder, S3BlobStore blobStore) {
72+
public static void configureEncryptionSettings(
73+
PutObjectRequest.Builder builder,
74+
S3BlobStore blobStore,
75+
@Nullable CryptoMetadata cryptoMetadata
76+
) {
3277
if (blobStore.serverSideEncryptionType().equals(ServerSideEncryption.AES256.toString())) {
3378
builder.serverSideEncryption(ServerSideEncryption.AES256);
3479
} else if (blobStore.serverSideEncryptionType().equals(ServerSideEncryption.AWS_KMS.toString())) {
80+
String indexKmsKey = null;
81+
String indexEncContext = null;
82+
83+
if (cryptoMetadata instanceof KmsCryptoMetadata) {
84+
KmsCryptoMetadata kmsMetadata = (KmsCryptoMetadata) cryptoMetadata;
85+
indexKmsKey = kmsMetadata.getKmsKeyArn().orElse(null);
86+
indexEncContext = kmsMetadata.getKmsEncryptionContext().orElse(null);
87+
}
88+
89+
String kmsKey = (indexKmsKey != null) ? indexKmsKey : blobStore.serverSideEncryptionKmsKey();
90+
String encContext = mergeAndEncodeEncryptionContexts(
91+
indexEncContext,
92+
blobStore.serverSideEncryptionEncryptionContext()
93+
);
94+
3595
builder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
36-
builder.ssekmsKeyId(blobStore.serverSideEncryptionKmsKey());
96+
builder.ssekmsKeyId(kmsKey);
3797
builder.bucketKeyEnabled(blobStore.serverSideEncryptionBucketKey());
38-
builder.ssekmsEncryptionContext(blobStore.serverSideEncryptionEncryptionContext());
98+
builder.ssekmsEncryptionContext(encContext);
3999
}
40100
}
41101

102+
public static void configureEncryptionSettings(PutObjectRequest.Builder builder, S3BlobStore blobStore) {
103+
configureEncryptionSettings(builder, blobStore, null);
104+
}
105+
42106
public static void configureEncryptionSettings(CreateMultipartUploadRequest.Builder builder, UploadRequest uploadRequest) {
43107
if (uploadRequest.getServerSideEncryptionType().equals(ServerSideEncryption.AES256.toString())) {
44108
builder.serverSideEncryption(ServerSideEncryption.AES256);
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.repositories.s3;
10+
11+
import org.opensearch.cluster.metadata.CryptoMetadata;
12+
import org.opensearch.common.settings.Settings;
13+
import org.opensearch.repositories.s3.utils.SseKmsUtil;
14+
import org.opensearch.test.OpenSearchTestCase;
15+
16+
import java.util.Base64;
17+
18+
public class S3BlobContainerEncryptionTests extends OpenSearchTestCase {
19+
20+
public void testEncryptionContextMerging() {
21+
String indexContext = "tenant=acme,classification=confidential";
22+
String repoContext = Base64.getEncoder().encodeToString("{\"repo\":\"test\",\"env\":\"prod\"}".getBytes());
23+
24+
String merged = SseKmsUtil.mergeAndEncodeEncryptionContexts(indexContext, repoContext);
25+
assertNotNull(merged);
26+
27+
String decoded = new String(Base64.getDecoder().decode(merged));
28+
assertTrue(decoded.contains("\"tenant\":\"acme\""));
29+
assertTrue(decoded.contains("\"classification\":\"confidential\""));
30+
assertTrue(decoded.contains("\"repo\":\"test\""));
31+
assertTrue(decoded.contains("\"env\":\"prod\""));
32+
}
33+
34+
public void testCryptoMetadataExtraction() {
35+
Settings cryptoSettings = Settings.builder()
36+
.put("kms.key_arn", "arn:aws:kms:us-east-1:123456789:key/index-key")
37+
.put("kms.encryption_context", "tenant=acme,env=staging")
38+
.build();
39+
CryptoMetadata cryptoMetadata = new CryptoMetadata("index-provider", "aws-kms", cryptoSettings);
40+
41+
assertEquals("arn:aws:kms:us-east-1:123456789:key/index-key", cryptoMetadata.settings().get("kms.key_arn"));
42+
assertEquals("tenant=acme,env=staging", cryptoMetadata.settings().get("kms.encryption_context"));
43+
}
44+
}

plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -757,8 +757,8 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException, W
757757
anyString(),
758758
any(InputStream.class),
759759
anyLong(),
760-
anyMap()
761-
);
760+
anyMap(),
761+
null, null);
762762

763763
if (expectException) {
764764
verify(client, times(1)).abortMultipartUpload(any(AbortMultipartUploadRequest.class));

0 commit comments

Comments
 (0)