Skip to content

Commit

Permalink
Implement sidecar streaming read to accommodate the tracking report d…
Browse files Browse the repository at this point in the history
…igest computing
  • Loading branch information
yma96 committed Feb 8, 2023
1 parent 5c36f5a commit c5f9007
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import javax.inject.Inject;
import javax.ws.rs.core.Response;
import javax.xml.bind.DatatypeConverter;
import java.io.IOException;
import java.io.InputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
Expand Down Expand Up @@ -180,25 +179,10 @@ private Response convertProxyResp( okhttp3.Response resp, HttpMethod method, Tra
builder.header( header.getFirst(), header.getSecond() );
}
} );
if ( resp.body() != null && entry != null )
{
byte[] bytes = new byte[0];
try
{
bytes = resp.body().bytes();
}
catch ( IOException e )
{
logger.error( "Failed to read bytes from okhttp response", e );
}
entry.setSize( (long) bytes.length );
String[] headers = resp.header( "indy-origin" ).split( ":" );
entry.setOriginUrl( "http://" + proxyConfiguration.getServices().iterator().next().host + "/api/content/"
+ headers[0] + "/" + headers[1] + "/" + headers[2] + entry.getPath() );
updateMessageDigest( bytes, entry );
reportService.appendDownload( entry );
}
builder.entity( new ProxyStreamingOutput( resp.body().byteStream(), otel ) );
String indyOrigin = resp.header( "indy-origin" );
String serviceOrigin = "http://" + proxyConfiguration.getServices().iterator().next().host;
builder.entity( new ProxyStreamingOutput( resp.body(), entry, serviceOrigin, indyOrigin, reportService,
otel ) );
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,52 +16,134 @@
package org.commonjava.util.sidecar.util;

import io.opentelemetry.api.trace.Span;
import org.apache.commons.io.IOUtils;
import okhttp3.ResponseBody;
import okio.BufferedSource;
import org.apache.commons.io.output.CountingOutputStream;
import org.commonjava.util.sidecar.model.TrackedContentEntry;
import org.commonjava.util.sidecar.services.ReportService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.ws.rs.core.StreamingOutput;
import javax.xml.bind.DatatypeConverter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;

public class ProxyStreamingOutput
implements StreamingOutput
{
private final Logger logger = LoggerFactory.getLogger( getClass() );

private final InputStream bodyStream;
private static final String MD5 = "MD5";

private static final String SHA1 = "SHA-1";

private static final String SHA256 = "SHA-256";

private static final String[] DIGESTS = { MD5, SHA1, SHA256 };

private static final long bufSize = 10 * 1024 * 1024;

private final ResponseBody responseBody;

private final TrackedContentEntry entry;

private final String serviceOrigin;

private final String indyOrigin;

private final ReportService reportService;

private final OtelAdapter otel;

public ProxyStreamingOutput( InputStream bodyStream, OtelAdapter otel )
private final Map<String, MessageDigest> digests = new HashMap<>();

public ProxyStreamingOutput( ResponseBody responseBody, TrackedContentEntry entry, String serviceOrigin,
String indyOrigin, ReportService reportService, OtelAdapter otel )
{
this.bodyStream = bodyStream;
this.responseBody = responseBody;
this.entry = entry;
this.serviceOrigin = serviceOrigin;
this.indyOrigin = indyOrigin;
this.reportService = reportService;
this.otel = otel;

for ( String key : DIGESTS )
{
try
{
digests.put( key, MessageDigest.getInstance( key ) );
}
catch ( NoSuchAlgorithmException e )
{
logger.warn( "Bytes hash calculation failed for request. Cannot get digest of type: {}", key );
}
}
}

@Override
public void write( OutputStream output ) throws IOException
{
if ( bodyStream != null )
if ( responseBody != null )
{
try
try (CountingOutputStream cout = new CountingOutputStream( output ))
{
OutputStream out = output;
CountingOutputStream cout = new CountingOutputStream( out );
out = cout;
logger.trace( "Copying from: {} to: {}", bodyStream, out );
IOUtils.copy( bodyStream, out );

OutputStream out = cout;
BufferedSource peed = responseBody.source().peek();
while ( !peed.exhausted() )
{
byte[] bytes;
if ( peed.request( bufSize ) )
{
bytes = peed.readByteArray(
bufSize ); // byteCount bytes will be removed from current buffer after read
}
else
{
bytes = peed.readByteArray();
}
out.write( bytes );
if ( entry != null )
{
digests.values().forEach( d -> d.update( bytes ) );
}
}
out.flush();
if ( otel.enabled() )
{
Span.current().setAttribute( "response.content_length", cout.getByteCount() );
}
if ( entry != null )
{
entry.setSize( cout.getByteCount() );
String[] headers = indyOrigin.split( ":" );
entry.setOriginUrl(
serviceOrigin + "/api/content/" + headers[0] + "/" + headers[1] + "/" + headers[2]
+ entry.getPath() );
if ( digests.containsKey( MD5 ) )
entry.setMd5( DatatypeConverter.printHexBinary( digests.get( MD5 ).digest() ).toLowerCase() );

if ( digests.containsKey( SHA1 ) )
entry.setSha1( DatatypeConverter.printHexBinary( digests.get( SHA1 ).digest() ).toLowerCase() );

if ( digests.containsKey( SHA256 ) )
entry.setSha256( DatatypeConverter.printHexBinary( digests.get( SHA256 ).digest() )
.toLowerCase() );

reportService.appendDownload( entry );
}
}
finally
{
closeBodyStream( bodyStream );
if ( responseBody == null )
{
return;
}
responseBody.close();
}
}
else
Expand All @@ -72,26 +154,4 @@ public void write( OutputStream output ) throws IOException
}
}
}

private void closeBodyStream( InputStream is )
{
if ( is == null )
{
return;
}

try
{
is.close();
}
catch ( IOException e )
{
if ( otel.enabled() )
{
Span.current().setAttribute( "body.ignored_error_class", e.getClass().getSimpleName() );
Span.current().setAttribute( "body.ignored_error_class", e.getMessage() );
}
logger.trace( "Failed to close body stream in proxy response.", e );
}
}
}

0 comments on commit c5f9007

Please sign in to comment.