Skip to content

Commit

Permalink
Refactored compression and decompression logic to delegate stream han…
Browse files Browse the repository at this point in the history
…dling to Apache Commons Compress. Removed redundant buffer management, simplified content encoding methods, and updated documentation. Addressed feedback regarding thread safety and alignment with existing API standards.
  • Loading branch information
arturobernalg committed Sep 20, 2024
1 parent 48de43f commit 300da7f
Show file tree
Hide file tree
Showing 15 changed files with 146 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -639,8 +639,8 @@ public void handle(final ClassicHttpRequest request,
Assertions.assertEquals(new URIBuilder().setHttpHost(target).setPath("/random/100").build(),
reqWrapper.getUri());

assertThat(values.poll(), CoreMatchers.equalTo("snappy-raw, snappy-framed, xz, bzip2, lz4-framed, deflate64, br, lzma, zstd, lz4-block, deflate, gz, z, pack200"));
assertThat(values.poll(), CoreMatchers.equalTo("snappy-raw, snappy-framed, xz, bzip2, lz4-framed, deflate64, br, lzma, zstd, lz4-block, deflate, gz, z, pack200"));
assertThat(values.poll(), CoreMatchers.equalTo("snappy-raw, xz, snappy-framed, bzip2, lz4-framed, deflate64, br, lzma, zstd, lz4-block, gz, deflate, z, pack200"));
assertThat(values.poll(), CoreMatchers.equalTo("snappy-raw, xz, snappy-framed, bzip2, lz4-framed, deflate64, br, lzma, zstd, lz4-block, gz, deflate, z, pack200"));
assertThat(values.poll(), CoreMatchers.nullValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
*
* @see GzipDecompressingEntity
* @since 5.2
* @deprecated
* @deprecated Use {@link CompressorFactory} for handling Brotli decompression.
*/
@Deprecated
public class BrotliDecompressingEntity extends DecompressingEntity {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
* {@link InputStreamFactory} for handling Brotli Content Coded responses.
*
* @since 5.2
* @deprecated
* @deprecated Use {@link CompressorFactory} for handling Brotli compression.
*/
@Deprecated
@Contract(threading = ThreadingBehavior.STATELESS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.commons.compress.compressors.CompressorException;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.io.entity.HttpEntityWrapper;
import org.apache.hc.core5.util.Args;
Expand All @@ -45,7 +46,7 @@
* reading the content directly through {@link #getContent()} as the content is always compressed
* during write operations.</p>
*
* @since 5.4
* @since 5.5
*/
public class CompressingEntity extends HttpEntityWrapper {

Expand Down Expand Up @@ -76,16 +77,6 @@ public String getContentEncoding() {
return contentEncoding;
}

/**
* Returns the length of the wrapped entity. As the content is compressed,
* this will return the length of the wrapped entity's compressed data.
*
* @return the length of the compressed content, or {@code -1} if unknown.
*/
@Override
public long getContentLength() {
return super.getContentLength();
}

/**
* Returns whether the entity is chunked. This is determined by the wrapped entity.
Expand All @@ -97,6 +88,7 @@ public boolean isChunked() {
return super.isChunked();
}


/**
* This method is unsupported because the content is meant to be compressed during the
* {@link #writeTo(OutputStream)} operation.
Expand All @@ -121,7 +113,12 @@ public void writeTo(final OutputStream outStream) throws IOException {
Args.notNull(outStream, "Output stream");

// Get the compressor based on the specified content encoding
final OutputStream compressorStream = CompressorFactory.INSTANCE.getCompressorOutputStream(contentEncoding, outStream);
final OutputStream compressorStream;
try {
compressorStream = CompressorFactory.INSTANCE.getCompressorOutputStream(contentEncoding, outStream);
} catch (final CompressorException e) {
throw new IOException("Error initializing decompression stream", e);
}

if (compressorStream != null) {
// Write compressed data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,14 @@

import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import org.apache.commons.compress.compressors.CompressorException;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream;
import org.apache.commons.compress.compressors.deflate.DeflateParameters;
Expand All @@ -62,6 +61,8 @@
* <p>
* This class is thread-safe and uses {@link AtomicReference} to cache the available input and output stream providers.
* </p>
*
* @since 5.5
*/
public class CompressorFactory {

Expand All @@ -76,22 +77,13 @@ public class CompressorFactory {
private final AtomicReference<Set<String>> outputProvidersCache = new AtomicReference<>();
private final Map<String, String> formattedNameCache = new ConcurrentHashMap<>();

private static final Map<String, String> COMPRESSION_ALIASES;
static {
final Map<String, String> aliases = new HashMap<>();
aliases.put("gzip", "gz");
aliases.put("x-gzip", "gz");
aliases.put("compress", "z");
COMPRESSION_ALIASES = Collections.unmodifiableMap(aliases);
}

/**
* Returns a set of available input stream compression providers.
*
* @return a set of available input stream compression providers in lowercase.
*/
public Set<String> getAvailableInputProviders() {
return getAvailableProviders(inputProvidersCache, false);
return inputProvidersCache.updateAndGet(existing -> existing != null ? existing : fetchAvailableInputProviders());
}

/**
Expand All @@ -100,7 +92,7 @@ public Set<String> getAvailableInputProviders() {
* @return a set of available output stream compression providers in lowercase.
*/
public Set<String> getAvailableOutputProviders() {
return getAvailableProviders(outputProvidersCache, true);
return outputProvidersCache.updateAndGet(existing -> existing != null ? existing : fetchAvailableOutputProviders());
}

/**
Expand All @@ -119,29 +111,35 @@ public String getFormattedName(final String name) {
return null;
}
final String lowerCaseName = name.toLowerCase(Locale.ROOT);
return formattedNameCache.computeIfAbsent(lowerCaseName, key -> COMPRESSION_ALIASES.getOrDefault(key, key));
return formattedNameCache.computeIfAbsent(lowerCaseName, key -> {
if ("gzip".equals(key) || "x-gzip".equals(key)) {
return "gz";
} else if ("compress".equals(key)) {
return "z";
}
return key;
});
}


/**
* Creates an input stream for the specified compression format and decompresses the provided input stream.
* <p>
* This method uses the specified compression name to decompress the input stream and supports the "nowrap" option
* This method uses the specified compression name to decompress the input stream and supports the "noWrap" option
* for deflate streams.
* </p>
*
* @param name the compression format.
* @param inputStream the input stream to decompress.
* @param nowrap if true, disables the zlib header and trailer for deflate streams.
* @param noWrap if true, disables the zlib header and trailer for deflate streams.
* @return the decompressed input stream, or the original input stream if the format is not supported.
*/
public InputStream getCompressorInputStream(final String name, final InputStream inputStream, final boolean nowrap) {
public InputStream getCompressorInputStream(final String name, final InputStream inputStream, final boolean noWrap) throws CompressorException {
Args.notNull(inputStream, "InputStream");
Args.notNull(name, "name");

final String formattedName = getFormattedName(name);
return isSupported(formattedName, false)
? createCompressorInputStream(formattedName, inputStream, nowrap)
? createCompressorInputStream(formattedName, inputStream, noWrap)
: inputStream;
}

Expand All @@ -152,30 +150,15 @@ public InputStream getCompressorInputStream(final String name, final InputStream
* @param outputStream the output stream to compress.
* @return the compressed output stream, or the original output stream if the format is not supported.
*/
public OutputStream getCompressorOutputStream(final String name, final OutputStream outputStream) {
public OutputStream getCompressorOutputStream(final String name, final OutputStream outputStream) throws CompressorException {
final String formattedName = getFormattedName(name);
return isSupported(formattedName, true)
? createCompressorOutputStream(formattedName, outputStream)
: outputStream;
}

/**
* Compresses the provided HTTP entity using the specified compression format.
*
* @param entity the HTTP entity to compress.
* @param contentEncoding the compression format.
* @return a compressed {@link HttpEntity}, or {@code null} if the compression format is unsupported.
*/
public HttpEntity compressEntity(final HttpEntity entity, final String contentEncoding) {
Args.notNull(entity, "Entity");
Args.notNull(contentEncoding, "Content Encoding");
if (!isSupported(contentEncoding, true)) {
LOG.warn("Unsupported compression type: {}", contentEncoding);
return null;
}
return new CompressingEntity(entity, contentEncoding);
}


/**
* Decompresses the provided HTTP entity using the specified compression format.
*
Expand All @@ -192,103 +175,108 @@ public HttpEntity decompressEntity(final HttpEntity entity, final String content
*
* @param entity the HTTP entity to decompress.
* @param contentEncoding the compression format.
* @param nowrap if true, disables the zlib header and trailer for deflate streams.
* @param noWrap if true, disables the zlib header and trailer for deflate streams.
* @return a decompressed {@link HttpEntity}, or {@code null} if the compression format is unsupported.
*/
public HttpEntity decompressEntity(final HttpEntity entity, final String contentEncoding, final boolean nowrap) {
public HttpEntity decompressEntity(final HttpEntity entity, final String contentEncoding, final boolean noWrap) {
Args.notNull(entity, "Entity");
Args.notNull(contentEncoding, "Content Encoding");
if (!isSupported(contentEncoding, false)) {
LOG.warn("Unsupported decompression type: {}", contentEncoding);
return null;
}
return new DecompressEntity(entity, contentEncoding, nowrap);
return new DecompressEntity(entity, contentEncoding, noWrap);
}

/**
* Creates a compressor input stream for the given compression format and input stream.
* <p>
* This method handles the special case for deflate compression where the zlib header can be skipped.
* </p>
* Compresses the provided HTTP entity using the specified compression format.
*
* @param name the compression format.
* @param inputStream the input stream to decompress.
* @param nowrap if true, disables the zlib header and trailer for deflate streams.
* @return a decompressed input stream, or null if an error occurs.
* @param entity the HTTP entity to compress.
* @param contentEncoding the compression format.
* @return a compressed {@link HttpEntity}, or {@code null} if the compression format is unsupported.
*/
private InputStream createCompressorInputStream(final String name, final InputStream inputStream, final boolean nowrap) {
try {
if ("deflate".equalsIgnoreCase(name)) {
final DeflateParameters parameters = new DeflateParameters();
parameters.setWithZlibHeader(nowrap);
return new DeflateCompressorInputStream(inputStream, parameters);
}
return compressorStreamFactory.createCompressorInputStream(name, inputStream, true);
} catch (final Exception ex) {
LOG.warn("Could not create compressor {} input stream", name, ex);
public HttpEntity compressEntity(final HttpEntity entity, final String contentEncoding) {
Args.notNull(entity, "Entity");
Args.notNull(contentEncoding, "Content Encoding");
if (!isSupported(contentEncoding, true)) {
LOG.warn("Unsupported compression type: {}", contentEncoding);
return null;
}
return new CompressingEntity(entity, contentEncoding);
}

/**
* Determines if the specified compression format is supported for either input or output streams.
* Fetches the available input stream compression providers from Commons Compress.
*
* @param name the compression format.
* @param isOutput if true, checks if the format is supported for output; otherwise, checks for input support.
* @return true if the format is supported, false otherwise.
* @return a set of available input stream compression providers in lowercase.
*/
private boolean isSupported(final String name, final boolean isOutput) {
final String formattedName = getFormattedName(name);
return isOutput
? getAvailableOutputProviders().contains(formattedName)
: getAvailableInputProviders().contains(formattedName);
private Set<String> fetchAvailableInputProviders() {
final Set<String> inputNames = compressorStreamFactory.getInputStreamCompressorNames();
return inputNames.stream()
.map(String::toLowerCase)
.collect(Collectors.toSet());
}

/**
* Creates a compressor output stream for the given compression format and output stream.
* Fetches the available output stream compression providers from Commons Compress.
*
* @param name the compression format.
* @param outputStream the output stream to compress.
* @return a compressed output stream, or null if an error occurs.
* @return a set of available output stream compression providers in lowercase.
*/
private OutputStream createCompressorOutputStream(final String name, final OutputStream outputStream) {
try {
return compressorStreamFactory.createCompressorOutputStream(name, outputStream);
} catch (final Exception ex) {
LOG.warn("Could not create compressor {} output stream", name, ex);

return null;
}
private Set<String> fetchAvailableOutputProviders() {
final Set<String> outputNames = compressorStreamFactory.getOutputStreamCompressorNames();
return outputNames.stream()
.map(String::toLowerCase)
.collect(Collectors.toSet());
}

/**
* Retrieves the available compression providers for input or output streams.
* Creates a compressor input stream for the given compression format and input stream.
* <p>
* This method uses a cache to avoid redundant lookups and ensures the providers are formatted in lowercase.
* This method handles the special case for deflate compression where the zlib header can be optionally included.
* The noWrap parameter directly controls the behavior of the zlib header:
* - If noWrap is {@code true}, the deflate stream is processed without zlib headers (raw Deflate).
* - If noWrap is {@code false}, the deflate stream includes the zlib header.
* </p>
*
* @param cache the cache that stores the available providers.
* @param isOutput if true, retrieves available providers for output streams; otherwise, for input streams.
* @return a set of available compression providers in lowercase.
* @param name the compression format (e.g., "gzip", "deflate").
* @param inputStream the input stream to decompress; must not be {@code null}.
* @param noWrap if {@code true}, disables the zlib header and trailer for deflate streams (raw Deflate).
* @return a decompressed input stream, or {@code null} if an error occurs during stream creation.
* @throws CompressorException if an error occurs while creating the compressor input stream or if the compression format is unsupported.
*/
private Set<String> getAvailableProviders(final AtomicReference<Set<String>> cache, final boolean isOutput) {
return cache.updateAndGet(existing -> existing != null ? existing : fetchAvailableProviders(isOutput));
private InputStream createCompressorInputStream(final String name, final InputStream inputStream, final boolean noWrap) throws CompressorException {
if ("deflate".equalsIgnoreCase(name)) {
final DeflateParameters parameters = new DeflateParameters();
parameters.setWithZlibHeader(noWrap);
return new DeflateCompressorInputStream(inputStream, parameters);
}
return compressorStreamFactory.createCompressorInputStream(name, inputStream, true);
}

/**
* Fetches the available compression providers by querying the {@link CompressorStreamFactory}.
* Creates a compressor output stream for the given compression format and output stream.
*
* @param isOutput if true, fetches available providers for output streams; otherwise, for input streams.
* @return a set of available compression providers in lowercase.
* @param name the compression format.
* @param outputStream the output stream to compress.
* @return a compressed output stream, or null if an error occurs.
* @throws CompressorException if an error occurs while creating the compressor output stream.
*/
private Set<String> fetchAvailableProviders(final boolean isOutput) {
return (isOutput
? CompressorStreamFactory.findAvailableCompressorOutputStreamProviders()
: CompressorStreamFactory.findAvailableCompressorInputStreamProviders())
.keySet().stream()
.map(String::toLowerCase)
.collect(Collectors.collectingAndThen(Collectors.toSet(), Collections::unmodifiableSet));
private OutputStream createCompressorOutputStream(final String name, final OutputStream outputStream) throws CompressorException {
return compressorStreamFactory.createCompressorOutputStream(name, outputStream);
}

/**
* Determines if the specified compression format is supported for either input or output streams.
*
* @param name the compression format.
* @param isOutput if true, checks if the format is supported for output; otherwise, checks for input support.
* @return true if the format is supported, false otherwise.
*/
private boolean isSupported(final String name, final boolean isOutput) {
final Set<String> availableProviders = isOutput ? getAvailableOutputProviders() : getAvailableInputProviders();
return availableProviders.contains(name);
}
}



Loading

0 comments on commit 300da7f

Please sign in to comment.