From 1649fc6dfabe7245ad53a33596832b9e38822952 Mon Sep 17 00:00:00 2001 From: Sergiu Dumitriu Date: Thu, 7 Nov 2024 15:22:04 -0500 Subject: [PATCH 1/3] CARDS-2593: Exporting very big files to S3 may fail - Pass the input size to the storage - Split the upload into chunks --- .../io/uhndata/cards/export/ExportTask.java | 3 +- .../internal/formatters/CSVDataFormatter.java | 4 +- .../formatters/JSONDataFormatter.java | 4 +- .../formatters/RawFileDataFormatter.java | 5 +- .../internal/formatters/TSVDataFormatter.java | 4 +- .../internal/stores/FilesystemDataStore.java | 2 +- .../cards/export/spi/DataPipelineStep.java | 9 +++ .../uhndata/cards/export/spi/DataStore.java | 2 +- modules/s3-export/pom.xml | 8 ++ .../uhndata/cards/s3export/S3DataStore.java | 79 ++++++++++++++++--- 10 files changed, 104 insertions(+), 16 deletions(-) diff --git a/modules/export/src/main/java/io/uhndata/cards/export/ExportTask.java b/modules/export/src/main/java/io/uhndata/cards/export/ExportTask.java index 5aef96673c..22a8347ac6 100644 --- a/modules/export/src/main/java/io/uhndata/cards/export/ExportTask.java +++ b/modules/export/src/main/java/io/uhndata/cards/export/ExportTask.java @@ -210,7 +210,8 @@ private String getTargetFileName(final ResourceIdentifier identifier, final Zone private void output(ResourceRepresentation input, String filename) { try { - this.store.store(input.getRepresentation(), filename, input.getMimeType(), this.config); + this.store.store(input.getRepresentation(), input.getRepresentationSize(), filename, input.getMimeType(), + this.config); input.getDataContents().forEach(form -> { LOGGER.info("Exported {}", form); Metrics.increment(this.resolverFactory, "S3ExportedForms", 1); diff --git a/modules/export/src/main/java/io/uhndata/cards/export/internal/formatters/CSVDataFormatter.java b/modules/export/src/main/java/io/uhndata/cards/export/internal/formatters/CSVDataFormatter.java index 7142983745..22961e92f8 100644 --- a/modules/export/src/main/java/io/uhndata/cards/export/internal/formatters/CSVDataFormatter.java +++ b/modules/export/src/main/java/io/uhndata/cards/export/internal/formatters/CSVDataFormatter.java @@ -49,10 +49,12 @@ public ResourceRepresentation format(final ResourceIdentifier what, throws RepositoryException { Resource r = resolver.resolve(what.getExportPath() + ".csv"); + byte[] bytes = r.adaptTo(CSVString.class).toString().getBytes(StandardCharsets.UTF_8); return new ResourceRepresentation(what, // FIXME Use streaming instead of building the whole CSV in memory - new ByteArrayInputStream(r.adaptTo(CSVString.class).toString().getBytes(StandardCharsets.UTF_8)), + new ByteArrayInputStream(bytes), + bytes.length, "text/csv", getContentsSummary(what, config, resolver)); } diff --git a/modules/export/src/main/java/io/uhndata/cards/export/internal/formatters/JSONDataFormatter.java b/modules/export/src/main/java/io/uhndata/cards/export/internal/formatters/JSONDataFormatter.java index d3353c0bc1..2686901022 100644 --- a/modules/export/src/main/java/io/uhndata/cards/export/internal/formatters/JSONDataFormatter.java +++ b/modules/export/src/main/java/io/uhndata/cards/export/internal/formatters/JSONDataFormatter.java @@ -50,9 +50,11 @@ public ResourceRepresentation format(final ResourceIdentifier what, { Resource r = resolver.resolve(what.getExportPath() + ".json"); JsonObject serialization = r.adaptTo(JsonObject.class); + byte[] bytes = serialization.toString().getBytes(StandardCharsets.UTF_8); return new ResourceRepresentation(what, - new ByteArrayInputStream(serialization.toString().getBytes(StandardCharsets.UTF_8)), + new ByteArrayInputStream(bytes), + bytes.length, "application/json", getContentsSummary(what, config, resolver)); } diff --git a/modules/export/src/main/java/io/uhndata/cards/export/internal/formatters/RawFileDataFormatter.java b/modules/export/src/main/java/io/uhndata/cards/export/internal/formatters/RawFileDataFormatter.java index b4ad28a7ec..aa40ac3952 100644 --- a/modules/export/src/main/java/io/uhndata/cards/export/internal/formatters/RawFileDataFormatter.java +++ b/modules/export/src/main/java/io/uhndata/cards/export/internal/formatters/RawFileDataFormatter.java @@ -22,6 +22,7 @@ import java.time.ZonedDateTime; import java.util.Collections; +import javax.jcr.Binary; import javax.jcr.Node; import javax.jcr.RepositoryException; @@ -51,8 +52,10 @@ public ResourceRepresentation format(final ResourceIdentifier what, n = n.getNode("jcr:content"); } if (n.isNodeType("nt:resource") && n.hasProperty("jcr:data")) { + final Binary content = n.getProperty("jcr:data").getBinary(); return new ResourceRepresentation(what, - n.getProperty("jcr:data").getBinary().getStream(), + content.getStream(), + content.getSize(), n.hasProperty("jcr:mimeType") ? n.getProperty("jcr:mimeType").getString() : "application/octet-stream", Collections.emptyList()); } else { diff --git a/modules/export/src/main/java/io/uhndata/cards/export/internal/formatters/TSVDataFormatter.java b/modules/export/src/main/java/io/uhndata/cards/export/internal/formatters/TSVDataFormatter.java index 1d7b97a01a..d95536a7d9 100644 --- a/modules/export/src/main/java/io/uhndata/cards/export/internal/formatters/TSVDataFormatter.java +++ b/modules/export/src/main/java/io/uhndata/cards/export/internal/formatters/TSVDataFormatter.java @@ -49,10 +49,12 @@ public ResourceRepresentation format(final ResourceIdentifier what, throws RepositoryException { Resource r = resolver.resolve(what.getExportPath() + ".tsv"); + byte[] bytes = r.adaptTo(CSVString.class).toString().getBytes(StandardCharsets.UTF_8); return new ResourceRepresentation(what, // FIXME Use streaming instead of building the whole CSV in memory - new ByteArrayInputStream(r.adaptTo(CSVString.class).toString().getBytes(StandardCharsets.UTF_8)), + new ByteArrayInputStream(bytes), + bytes.length, "text/tab-separated-values", getContentsSummary(what, config, resolver)); } diff --git a/modules/export/src/main/java/io/uhndata/cards/export/internal/stores/FilesystemDataStore.java b/modules/export/src/main/java/io/uhndata/cards/export/internal/stores/FilesystemDataStore.java index 5703e3af61..53291f8431 100644 --- a/modules/export/src/main/java/io/uhndata/cards/export/internal/stores/FilesystemDataStore.java +++ b/modules/export/src/main/java/io/uhndata/cards/export/internal/stores/FilesystemDataStore.java @@ -41,7 +41,7 @@ public String getName() } @Override - public void store(final InputStream contents, final String filename, final String mimetype, + public void store(final InputStream contents, final long size, final String filename, final String mimetype, final ExportConfigDefinition config) throws IOException { final File targetFile = diff --git a/modules/export/src/main/java/io/uhndata/cards/export/spi/DataPipelineStep.java b/modules/export/src/main/java/io/uhndata/cards/export/spi/DataPipelineStep.java index 6ccfdd348d..026f002223 100644 --- a/modules/export/src/main/java/io/uhndata/cards/export/spi/DataPipelineStep.java +++ b/modules/export/src/main/java/io/uhndata/cards/export/spi/DataPipelineStep.java @@ -93,17 +93,21 @@ final class ResourceRepresentation private final InputStream data; + private final long size; + private final List dataContents; private final String mimeType; public ResourceRepresentation(final ResourceIdentifier identifier, final InputStream data, + final long size, final String mimeType, final List dataContents) { this.identifier = identifier; this.data = data; + this.size = size; this.mimeType = mimeType; this.dataContents = dataContents; } @@ -118,6 +122,11 @@ public InputStream getRepresentation() return this.data; } + public long getRepresentationSize() + { + return this.size; + } + public List getDataContents() { return this.dataContents; diff --git a/modules/export/src/main/java/io/uhndata/cards/export/spi/DataStore.java b/modules/export/src/main/java/io/uhndata/cards/export/spi/DataStore.java index 2cab7d6856..e2aaf67a09 100644 --- a/modules/export/src/main/java/io/uhndata/cards/export/spi/DataStore.java +++ b/modules/export/src/main/java/io/uhndata/cards/export/spi/DataStore.java @@ -25,6 +25,6 @@ public interface DataStore extends DataPipelineStep { - void store(InputStream contents, String filename, String mimetype, ExportConfigDefinition config) + void store(InputStream contents, long size, String filename, String mimetype, ExportConfigDefinition config) throws IOException; } diff --git a/modules/s3-export/pom.xml b/modules/s3-export/pom.xml index 509de12d73..f30c08bd35 100644 --- a/modules/s3-export/pom.xml +++ b/modules/s3-export/pom.xml @@ -60,6 +60,14 @@ org.osgi org.osgi.service.component.annotations + + org.slf4j + slf4j-api + + + org.apache.commons + commons-lang3 + com.amazonaws aws-java-sdk-osgi diff --git a/modules/s3-export/src/main/java/io/uhndata/cards/s3export/S3DataStore.java b/modules/s3-export/src/main/java/io/uhndata/cards/s3export/S3DataStore.java index a8756c4702..ff973f9cd2 100644 --- a/modules/s3-export/src/main/java/io/uhndata/cards/s3export/S3DataStore.java +++ b/modules/s3-export/src/main/java/io/uhndata/cards/s3export/S3DataStore.java @@ -21,8 +21,13 @@ import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.lang3.StringUtils; import org.osgi.service.component.annotations.Component; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; @@ -30,20 +35,21 @@ import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.amazonaws.services.s3.model.UploadPartResult; import io.uhndata.cards.export.ExportConfigDefinition; import io.uhndata.cards.export.spi.DataStore; @Component(immediate = true, service = DataStore.class) +@SuppressWarnings("checkstyle:ClassDataAbstractionCoupling") public class S3DataStore implements DataStore { - private String env(final String value) - { - if (value != null && value.startsWith("%ENV%")) { - return System.getenv(value.substring("%ENV%".length())); - } - return value; - } + private static final Logger LOGGER = LoggerFactory.getLogger(S3DataStore.class); @Override public String getName() @@ -52,7 +58,7 @@ public String getName() } @Override - public void store(final InputStream contents, final String filename, final String mimetype, + public void store(final InputStream contents, final long size, final String filename, final String mimetype, final ExportConfigDefinition config) throws IOException { final String s3EndpointUrl = @@ -70,12 +76,67 @@ public void store(final InputStream contents, final String filename, final Strin .withPathStyleAccessEnabled(true) .withCredentials(new AWSStaticCredentialsProvider(credentials)) .build(); + try { final ObjectMetadata meta = new ObjectMetadata(); meta.setContentType(mimetype); - s3.putObject(s3BucketName, filename, contents, meta); + if (size >= 0) { + meta.setContentLength(size); + } + + final List partETags = new ArrayList<>(); + + final InitiateMultipartUploadRequest initRequest = + new InitiateMultipartUploadRequest(s3BucketName, filename).withObjectMetadata(meta); + final InitiateMultipartUploadResult initResponse = s3.initiateMultipartUpload(initRequest); + long position = 0; + long partSize = getPartSize(config.storageParameters()); + for (int partNumber = 1; position < size; ++partNumber) { + partSize = Math.min(partSize, (size - position)); + + UploadPartRequest uploadRequest = new UploadPartRequest() + .withBucketName(s3BucketName) + .withKey(filename) + .withUploadId(initResponse.getUploadId()) + .withInputStream(contents) + .withPartNumber(partNumber) + .withPartSize(partSize); + + UploadPartResult uploadResult = s3.uploadPart(uploadRequest); + partETags.add(uploadResult.getPartETag()); + + position += partSize; + } + + CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(s3BucketName, filename, + initResponse.getUploadId(), partETags); + s3.completeMultipartUpload(compRequest); } catch (Exception e) { throw new IOException("Failed to store file " + filename + " into S3 store " + getName(), e); } } + + private long getPartSize(final String[] parameters) + { + final String sizeStr = getNamedParameter(parameters, "chunkSizeInMB"); + if (!StringUtils.isBlank(sizeStr)) { + try { + final int size = Integer.parseInt(sizeStr); + if (size > 0) { + return size * 1024 * 1024; + } + } catch (NumberFormatException e) { + LOGGER.warn("Invalid chink size configured for the S3 storage: {}", sizeStr); + } + } + return 10 * 1024 * 1024; + } + + private String env(final String value) + { + if (value != null && value.startsWith("%ENV%")) { + return System.getenv(value.substring("%ENV%".length())); + } + return value; + } } From cb6b1336ba22cf24de175cf4a5a03217ac8d5e1d Mon Sep 17 00:00:00 2001 From: Sergiu Dumitriu Date: Thu, 7 Nov 2024 15:22:51 -0500 Subject: [PATCH 2/3] [misc] Enable payload signing for all S3 requests --- .../src/main/java/io/uhndata/cards/s3export/S3DataStore.java | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/s3-export/src/main/java/io/uhndata/cards/s3export/S3DataStore.java b/modules/s3-export/src/main/java/io/uhndata/cards/s3export/S3DataStore.java index ff973f9cd2..8c1dc01b51 100644 --- a/modules/s3-export/src/main/java/io/uhndata/cards/s3export/S3DataStore.java +++ b/modules/s3-export/src/main/java/io/uhndata/cards/s3export/S3DataStore.java @@ -72,6 +72,7 @@ public void store(final InputStream contents, final long size, final String file new EndpointConfiguration(s3EndpointUrl, s3EndpointRegion); final AWSCredentials credentials = new BasicAWSCredentials(awsKey, awsSecret); final AmazonS3 s3 = AmazonS3ClientBuilder.standard() + .withPayloadSigningEnabled(true) .withEndpointConfiguration(endpointConfig) .withPathStyleAccessEnabled(true) .withCredentials(new AWSStaticCredentialsProvider(credentials)) From eb807c58d9977146dc7297f6dfa7c290b0cedc6c Mon Sep 17 00:00:00 2001 From: Sergiu Dumitriu Date: Thu, 7 Nov 2024 15:24:16 -0500 Subject: [PATCH 3/3] [heracles] Set the preferred chunk size to 5MB [heracles] Override the application/* mimetypes to text/plain to avoid triggering a refusal for PDF files --- heracles-resources/feature/src/main/features/feature.json | 6 +++++- .../main/java/io/uhndata/cards/s3export/S3DataStore.java | 5 ++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/heracles-resources/feature/src/main/features/feature.json b/heracles-resources/feature/src/main/features/feature.json index 70d086063c..478375ea36 100644 --- a/heracles-resources/feature/src/main/features/feature.json +++ b/heracles-resources/feature/src/main/features/feature.json @@ -86,7 +86,11 @@ ], "formatter": "rawFile", "fileNameFormat": "{resourcePath}", - "storage": "s3" + "storage": "s3", + "storageParameters": [ + "blockedApplicationMimeTypeWorkaround=true", + "chunkSizeInMB=5" + ] } } } diff --git a/modules/s3-export/src/main/java/io/uhndata/cards/s3export/S3DataStore.java b/modules/s3-export/src/main/java/io/uhndata/cards/s3export/S3DataStore.java index 8c1dc01b51..9a98ec7371 100644 --- a/modules/s3-export/src/main/java/io/uhndata/cards/s3export/S3DataStore.java +++ b/modules/s3-export/src/main/java/io/uhndata/cards/s3export/S3DataStore.java @@ -80,7 +80,10 @@ public void store(final InputStream contents, final long size, final String file try { final ObjectMetadata meta = new ObjectMetadata(); - meta.setContentType(mimetype); + // Some s3 buckets may forbid uploading "applications", so let's pretend that they're just plain text files + meta.setContentType( + "true".equals(getNamedParameter(config.storageParameters(), "blockedApplicationMimeTypeWorkaround")) + && mimetype.startsWith("application/") ? "text/plain" : mimetype); if (size >= 0) { meta.setContentLength(size); }