From 989c6c165741d0ec20d7b6cf8611187a9e3a9adb Mon Sep 17 00:00:00 2001 From: John Casey Date: Mon, 8 Nov 2021 16:51:25 -0600 Subject: [PATCH] Implement large file streaming for proxy GET --- .../util/sidecar/services/ProxyService.java | 35 +---- .../sidecar/util/BufferStreamingOutput.java | 142 ++++++++++++++++++ 2 files changed, 149 insertions(+), 28 deletions(-) create mode 100644 src/main/java/org/commonjava/util/sidecar/util/BufferStreamingOutput.java diff --git a/src/main/java/org/commonjava/util/sidecar/services/ProxyService.java b/src/main/java/org/commonjava/util/sidecar/services/ProxyService.java index 3a2915c..92b37a1 100644 --- a/src/main/java/org/commonjava/util/sidecar/services/ProxyService.java +++ b/src/main/java/org/commonjava/util/sidecar/services/ProxyService.java @@ -31,6 +31,7 @@ import org.commonjava.util.sidecar.model.StoreType; import org.commonjava.util.sidecar.model.TrackedContentEntry; import org.commonjava.util.sidecar.model.TrackingKey; +import org.commonjava.util.sidecar.util.BufferStreamingOutput; import org.commonjava.util.sidecar.util.UrlUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,7 @@ import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; import javax.xml.bind.DatatypeConverter; import java.io.IOException; import java.io.InputStream; @@ -300,34 +302,11 @@ private Response convertProxyResp( HttpResponse resp, TrackedContentEntr } ); if ( resp.body() != null ) { - byte[] bytes = resp.body().getBytes(); - if ( entry != null ) - { - entry.setSize( (long) bytes.length ); - String[] headers = resp.getHeader( "indy-origin" ).split( ":" ); - entry.setOriginUrl( - "http://" + proxyConfiguration.getServices().iterator().next().host + "/api/content/" - + headers[0] + "/" + headers[1] + "/" + headers[2] + entry.getPath() ); - MessageDigest message; - try - { - message = MessageDigest.getInstance( "MD5" ); - message.update( bytes ); - entry.setMd5( DatatypeConverter.printHexBinary( message.digest() ).toLowerCase() ); - message = MessageDigest.getInstance( "SHA-1" ); - message.update( bytes ); - entry.setSha1( DatatypeConverter.printHexBinary( message.digest() ).toLowerCase() ); - message = MessageDigest.getInstance( "SHA-256" ); - message.update( bytes ); - entry.setSha256( DatatypeConverter.printHexBinary( message.digest() ).toLowerCase() ); - reportService.appendDownload( entry ); - } - catch ( NoSuchAlgorithmException e ) - { - logger.warn( "Bytes hash calculation failed for request" ); - } - } - builder.entity( bytes ); + String indyOrigin = resp.getHeader( "indy-origin" ); + // FIXME: We need to account for HTTPS here...probably in the configuration itself. + String serviceOrigin = "http://" + proxyConfiguration.getServices().iterator().next().host; + StreamingOutput so = new BufferStreamingOutput( resp, entry, serviceOrigin, indyOrigin, reportService ); + builder.entity( so ); } return builder.build(); } diff --git a/src/main/java/org/commonjava/util/sidecar/util/BufferStreamingOutput.java b/src/main/java/org/commonjava/util/sidecar/util/BufferStreamingOutput.java new file mode 100644 index 0000000..9cf044e --- /dev/null +++ b/src/main/java/org/commonjava/util/sidecar/util/BufferStreamingOutput.java @@ -0,0 +1,142 @@ +package org.commonjava.util.sidecar.util; + +import io.vertx.mutiny.core.buffer.Buffer; +import io.vertx.mutiny.ext.web.client.HttpResponse; +import org.apache.commons.io.output.CountingOutputStream; +import org.apache.commons.io.output.TeeOutputStream; +import org.commonjava.util.sidecar.model.TrackedContentEntry; +import org.commonjava.util.sidecar.services.ReportService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.StreamingOutput; +import javax.xml.bind.DatatypeConverter; +import java.io.IOException; +import java.io.OutputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; + +public class BufferStreamingOutput + implements StreamingOutput +{ + private static final String MD5 = "MD5"; + private static final String SHA1 = "SHA-1"; + private static final String SHA256 = "SHA-256"; + private static final String[] DIGESTS = { MD5, SHA1, SHA256 }; + + private static final int bufSize = 10 * 1024 * 1024; + + private final Logger logger = LoggerFactory.getLogger( getClass() ); + + private HttpResponse response; + + private TrackedContentEntry entry; + + private final String serviceOrigin; + + private final String indyOrigin; + + private ReportService reportService; + + private Map digests = new HashMap<>(); + + private Supplier cacheStreamSupplier; + + public BufferStreamingOutput( HttpResponse response, TrackedContentEntry entry, String serviceOrigin, + String indyOrigin, ReportService reportService ) + { + this.response = response; + this.entry = entry; + this.serviceOrigin = serviceOrigin; + this.indyOrigin = indyOrigin; + this.reportService = reportService; + for ( String key : DIGESTS ) + { + try + { + digests.put( key, MessageDigest.getInstance( key ) ); + } + catch ( NoSuchAlgorithmException e ) + { + logger.warn( "Bytes hash calculation failed for request. Cannot get digest of type: {}", key ); + } + } + } + + @Override + public void write( OutputStream output ) throws IOException, WebApplicationException + { + OutputStream cacheStream = null; + try(CountingOutputStream cout = new CountingOutputStream( output )) + { + OutputStream out = cout; + if ( cacheStreamSupplier != null ) + { + cacheStream = cacheStreamSupplier.get(); + if ( cacheStream != null ) + { + out = new TeeOutputStream( cacheStream, output ); + } + } + + Buffer buffer = response.bodyAsBuffer(); + int total = buffer.length(); + int transferred = 0; + while ( transferred < total ) + { + int next = bufSize < total ? bufSize : total; + byte[] bytes = buffer.getBytes( transferred, next ); + out.write( bytes ); + if ( entry != null ) + { + digests.values().forEach( d->d.update( bytes ) ); + } + + transferred = next; + } + out.flush(); + + if ( entry != null ) + { + entry.setSize( cout.getByteCount() ); + String[] headers = indyOrigin.split( ":" ); + entry.setOriginUrl( + serviceOrigin + "/api/content/" + + headers[0] + "/" + headers[1] + "/" + headers[2] + entry.getPath() ); + if ( digests.containsKey( MD5 )) + entry.setMd5( DatatypeConverter.printHexBinary( digests.get(MD5).digest() ).toLowerCase() ); + + if ( digests.containsKey( SHA1 )) + entry.setSha1( DatatypeConverter.printHexBinary( digests.get( SHA1 ).digest() ).toLowerCase() ); + + if ( digests.containsKey( SHA256 )) + entry.setSha256( DatatypeConverter.printHexBinary( digests.get(SHA256).digest() ).toLowerCase() ); + + reportService.appendDownload( entry ); + } + } + finally + { + if ( cacheStream != null ) + { + try + { + cacheStream.close(); + } + catch ( Exception e ) + { + logger.error( "Failed to close cache stream: " + e.getMessage(), e ); + } + } + } + } + + public void setCacheStreamSupplier( Supplier cacheStreamSupplier ) + { + this.cacheStreamSupplier = cacheStreamSupplier; + } +}