Skip to content

Commit

Permalink
Merge pull request #1834 from data-team-uhn/CARDS-2593-s3-export-fixes
Browse files Browse the repository at this point in the history
CARDS-2593: Exporting very big files to S3 may fail
  • Loading branch information
sdumitriu authored Nov 14, 2024
2 parents a123b36 + eb807c5 commit 9662a39
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 18 deletions.
6 changes: 5 additions & 1 deletion heracles-resources/feature/src/main/features/feature.json
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@
],
"formatter": "rawFile",
"fileNameFormat": "{resourcePath}",
"storage": "s3"
"storage": "s3",
"storageParameters": [
"blockedApplicationMimeTypeWorkaround=true",
"chunkSizeInMB=5"
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ private String getTargetFileName(final ResourceIdentifier identifier, final Zone
private void output(ResourceRepresentation input, String filename)
{
try {
this.store.store(input.getRepresentation(), filename, input.getMimeType(), this.config);
this.store.store(input.getRepresentation(), input.getRepresentationSize(), filename, input.getMimeType(),
this.config);
input.getDataContents().forEach(form -> {
LOGGER.info("Exported {}", form);
Metrics.increment(this.resolverFactory, "S3ExportedForms", 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,12 @@ public ResourceRepresentation format(final ResourceIdentifier what,
throws RepositoryException
{
Resource r = resolver.resolve(what.getExportPath() + ".csv");
byte[] bytes = r.adaptTo(CSVString.class).toString().getBytes(StandardCharsets.UTF_8);

return new ResourceRepresentation(what,
// FIXME Use streaming instead of building the whole CSV in memory
new ByteArrayInputStream(r.adaptTo(CSVString.class).toString().getBytes(StandardCharsets.UTF_8)),
new ByteArrayInputStream(bytes),
bytes.length,
"text/csv",
getContentsSummary(what, config, resolver));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ public ResourceRepresentation format(final ResourceIdentifier what,
{
Resource r = resolver.resolve(what.getExportPath() + ".json");
JsonObject serialization = r.adaptTo(JsonObject.class);
byte[] bytes = serialization.toString().getBytes(StandardCharsets.UTF_8);

return new ResourceRepresentation(what,
new ByteArrayInputStream(serialization.toString().getBytes(StandardCharsets.UTF_8)),
new ByteArrayInputStream(bytes),
bytes.length,
"application/json",
getContentsSummary(what, config, resolver));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.time.ZonedDateTime;
import java.util.Collections;

import javax.jcr.Binary;
import javax.jcr.Node;
import javax.jcr.RepositoryException;

Expand Down Expand Up @@ -51,8 +52,10 @@ public ResourceRepresentation format(final ResourceIdentifier what,
n = n.getNode("jcr:content");
}
if (n.isNodeType("nt:resource") && n.hasProperty("jcr:data")) {
final Binary content = n.getProperty("jcr:data").getBinary();
return new ResourceRepresentation(what,
n.getProperty("jcr:data").getBinary().getStream(),
content.getStream(),
content.getSize(),
n.hasProperty("jcr:mimeType") ? n.getProperty("jcr:mimeType").getString() : "application/octet-stream",
Collections.emptyList());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,12 @@ public ResourceRepresentation format(final ResourceIdentifier what,
throws RepositoryException
{
Resource r = resolver.resolve(what.getExportPath() + ".tsv");
byte[] bytes = r.adaptTo(CSVString.class).toString().getBytes(StandardCharsets.UTF_8);

return new ResourceRepresentation(what,
// FIXME Use streaming instead of building the whole CSV in memory
new ByteArrayInputStream(r.adaptTo(CSVString.class).toString().getBytes(StandardCharsets.UTF_8)),
new ByteArrayInputStream(bytes),
bytes.length,
"text/tab-separated-values",
getContentsSummary(what, config, resolver));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public String getName()
}

@Override
public void store(final InputStream contents, final String filename, final String mimetype,
public void store(final InputStream contents, final long size, final String filename, final String mimetype,
final ExportConfigDefinition config) throws IOException
{
final File targetFile =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,21 @@ final class ResourceRepresentation

private final InputStream data;

private final long size;

private final List<String> dataContents;

private final String mimeType;

public ResourceRepresentation(final ResourceIdentifier identifier,
final InputStream data,
final long size,
final String mimeType,
final List<String> dataContents)
{
this.identifier = identifier;
this.data = data;
this.size = size;
this.mimeType = mimeType;
this.dataContents = dataContents;
}
Expand All @@ -118,6 +122,11 @@ public InputStream getRepresentation()
return this.data;
}

public long getRepresentationSize()
{
return this.size;
}

public List<String> getDataContents()
{
return this.dataContents;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@

public interface DataStore extends DataPipelineStep
{
void store(InputStream contents, String filename, String mimetype, ExportConfigDefinition config)
void store(InputStream contents, long size, String filename, String mimetype, ExportConfigDefinition config)
throws IOException;
}
8 changes: 8 additions & 0 deletions modules/s3-export/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@
<groupId>org.osgi</groupId>
<artifactId>org.osgi.service.component.annotations</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-osgi</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,35 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang3.StringUtils;
import org.osgi.service.component.annotations.Component;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import io.uhndata.cards.export.ExportConfigDefinition;
import io.uhndata.cards.export.spi.DataStore;

@Component(immediate = true, service = DataStore.class)
@SuppressWarnings("checkstyle:ClassDataAbstractionCoupling")
public class S3DataStore implements DataStore
{
private String env(final String value)
{
if (value != null && value.startsWith("%ENV%")) {
return System.getenv(value.substring("%ENV%".length()));
}
return value;
}
private static final Logger LOGGER = LoggerFactory.getLogger(S3DataStore.class);

@Override
public String getName()
Expand All @@ -52,7 +58,7 @@ public String getName()
}

@Override
public void store(final InputStream contents, final String filename, final String mimetype,
public void store(final InputStream contents, final long size, final String filename, final String mimetype,
final ExportConfigDefinition config) throws IOException
{
final String s3EndpointUrl =
Expand All @@ -66,16 +72,75 @@ public void store(final InputStream contents, final String filename, final Strin
new EndpointConfiguration(s3EndpointUrl, s3EndpointRegion);
final AWSCredentials credentials = new BasicAWSCredentials(awsKey, awsSecret);
final AmazonS3 s3 = AmazonS3ClientBuilder.standard()
.withPayloadSigningEnabled(true)
.withEndpointConfiguration(endpointConfig)
.withPathStyleAccessEnabled(true)
.withCredentials(new AWSStaticCredentialsProvider(credentials))
.build();

try {
final ObjectMetadata meta = new ObjectMetadata();
meta.setContentType(mimetype);
s3.putObject(s3BucketName, filename, contents, meta);
// Some s3 buckets may forbid uploading "applications", so let's pretend that they're just plain text files
meta.setContentType(
"true".equals(getNamedParameter(config.storageParameters(), "blockedApplicationMimeTypeWorkaround"))
&& mimetype.startsWith("application/") ? "text/plain" : mimetype);
if (size >= 0) {
meta.setContentLength(size);
}

final List<PartETag> partETags = new ArrayList<>();

final InitiateMultipartUploadRequest initRequest =
new InitiateMultipartUploadRequest(s3BucketName, filename).withObjectMetadata(meta);
final InitiateMultipartUploadResult initResponse = s3.initiateMultipartUpload(initRequest);
long position = 0;
long partSize = getPartSize(config.storageParameters());
for (int partNumber = 1; position < size; ++partNumber) {
partSize = Math.min(partSize, (size - position));

UploadPartRequest uploadRequest = new UploadPartRequest()
.withBucketName(s3BucketName)
.withKey(filename)
.withUploadId(initResponse.getUploadId())
.withInputStream(contents)
.withPartNumber(partNumber)
.withPartSize(partSize);

UploadPartResult uploadResult = s3.uploadPart(uploadRequest);
partETags.add(uploadResult.getPartETag());

position += partSize;
}

CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(s3BucketName, filename,
initResponse.getUploadId(), partETags);
s3.completeMultipartUpload(compRequest);
} catch (Exception e) {
throw new IOException("Failed to store file " + filename + " into S3 store " + getName(), e);
}
}

private long getPartSize(final String[] parameters)
{
final String sizeStr = getNamedParameter(parameters, "chunkSizeInMB");
if (!StringUtils.isBlank(sizeStr)) {
try {
final int size = Integer.parseInt(sizeStr);
if (size > 0) {
return size * 1024 * 1024;
}
} catch (NumberFormatException e) {
LOGGER.warn("Invalid chink size configured for the S3 storage: {}", sizeStr);
}
}
return 10 * 1024 * 1024;
}

private String env(final String value)
{
if (value != null && value.startsWith("%ENV%")) {
return System.getenv(value.substring("%ENV%".length()));
}
return value;
}
}

0 comments on commit 9662a39

Please sign in to comment.