diff --git a/compression_utils.py b/compression_utils.py new file mode 100644 index 0000000000..2a26a1c9d2 --- /dev/null +++ b/compression_utils.py @@ -0,0 +1,50 @@ +import lzma +from typing import BinaryIO, Union + +def lzma2_compress(data: bytes, format: str = "x86", preset: int = 9) -> bytes: + """ + Compress data using LZMA2 algorithm. + + :param data: The data to compress. + :param format: The format of the filter chain. Default is "x86". + :param preset: The compression preset. Default is 9. + :return: The compressed data. + """ + compressed_data = lzma.compress(data, format=format, preset=preset) + return compressed_data + +def lzma2_decompress(data: bytes) -> bytes: + """ + Decompress data compressed with LZMA2 algorithm. + + :param data: The compressed data. + :return: The decompressed data. + """ + decompressed_data = lzma.decompress(data) + return decompressed_data + +def lzma2_compress_file(input_file: Union[str, BinaryIO], output_file: Union[str, BinaryIO], format: str = "x86", preset: int = 9) -> None: + """ + Compress a file using LZMA2 algorithm. + + :param input_file: The input file to compress. Can be a file path or a binary file object. + :param output_file: The output file to write the compressed data. Can be a file path or a binary file object. + :param format: The format of the filter chain. Default is "x86". + :param preset: The compression preset. Default is 9. + """ + with open(input_file, "rb") as f_in: + with open(output_file, "wb") as f_out: + compressed_data = lzma2_compress(f_in.read(), format=format, preset=preset) + f_out.write(compressed_data) + +def lzma2_decompress_file(compressed_file: Union[str, BinaryIO], decompressed_file: Union[str, BinaryIO]) -> None: + """ + Decompress a file compressed with LZMA2 algorithm. + + :param compressed_file: The compressed file to decompress. Can be a file path or a binary file object. + :param decompressed_file: The output file to write the decompressed data. Can be a file path or a binary file object. + """ + with open(compressed_file, "rb") as f_in: + with open(decompressed_file, "wb") as f_out: + decompressed_data = lzma2_decompress(f_in.read()) + f_out.write(decompressed_data) \ No newline at end of file diff --git a/tests/test_compression.py b/tests/test_compression.py new file mode 100644 index 0000000000..760e0fd581 --- /dev/null +++ b/tests/test_compression.py @@ -0,0 +1,75 @@ + import py7zr +import shutil + +def test_lzma2_compression(): + # Test various input sizes + small_data = b"This is a small test string" + large_data = b"This is a large test string, repeated many times over. This is a large test string, repeated many times over." + + # Verify compression and decompression + compressed_data = compress_lzma2(small_data) + decompressed_data = decompress_lzma2(compressed_data) + assert decompressed_data == small_data + + compressed_data = compress_lzma2(large_data) + decompressed_data = decompress_lzma2(compressed_data) + assert decompressed_data == large_data + + # Check compression ratio + compression_ratio = calculate_compression_ratio(small_data, compressed_data) + assert compression_ratio > 0.5, "Compression ratio for small data is too low" + + compression_ratio = calculate_compression_ratio(large_data, compressed_data) + assert compression_ratio > 0.8, "Compression ratio for large data is too low" + + # Validate error handling + with pytest.raises(py7zr.exceptions.ChecksumError): + decompress_lzma2(compressed_data[:-1]) + + with pytest.raises(TypeError): + compress_lzma2("This is invalid input") + + # Test memory efficiency and resource usage + # (This is a best-effort test and may not be accurate on all systems) + memory_usage_before = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + compress_lzma2(large_data * 100) + memory_usage_after = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + assert memory_usage_after - memory_usage_before < 1e6, "Memory usage during compression is too high" + +def compress_lzma2(data: bytes) -> bytes: + """ + Compress data using the LZMA2 algorithm. + + :param data: The data to compress. + :return: The compressed data. + """ + with py7zr.SevenZipFile(mode="w", format="7z", compression_level=9) as z: + z.write("data.bin", data) + with open("data.7z", "rb") as f: + compressed_data = f.read() + shutil.os.remove("data.7z") + return compressed_data + +def decompress_lzma2(compressed_data: bytes) -> bytes: + """ + Decompress data using the LZMA2 algorithm. + + :param compressed_data: The compressed data. + :return: The decompressed data. + """ + with open("data.7z", "wb") as f: + f.write(compressed_data) + with py7zr.SevenZipFile("data.7z", mode="r") as z: + decompressed_data = z.read("data.bin") + shutil.os.remove("data.7z") + return decompressed_data + +def calculate_compression_ratio(original_data: bytes, compressed_data: bytes) -> float: + """ + Calculate the compression ratio of compressed_data compared to original_data. + + :param original_data: The original data. + :param compressed_data: The compressed data. + :return: The compression ratio as a float. + """ + return len(original_data) / len(compressed_data) \ No newline at end of file