Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions compression_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import lzma
from typing import BinaryIO, Union

def lzma2_compress(data: bytes, format: str = "x86", preset: int = 9) -> bytes:
"""
Compress data using LZMA2 algorithm.

:param data: The data to compress.
:param format: The format of the filter chain. Default is "x86".
:param preset: The compression preset. Default is 9.
:return: The compressed data.
"""
compressed_data = lzma.compress(data, format=format, preset=preset)
return compressed_data

def lzma2_decompress(data: bytes) -> bytes:
"""
Decompress data compressed with LZMA2 algorithm.

:param data: The compressed data.
:return: The decompressed data.
"""
decompressed_data = lzma.decompress(data)
return decompressed_data

def lzma2_compress_file(input_file: Union[str, BinaryIO], output_file: Union[str, BinaryIO], format: str = "x86", preset: int = 9) -> None:
"""
Compress a file using LZMA2 algorithm.

:param input_file: The input file to compress. Can be a file path or a binary file object.
:param output_file: The output file to write the compressed data. Can be a file path or a binary file object.
:param format: The format of the filter chain. Default is "x86".
:param preset: The compression preset. Default is 9.
"""
with open(input_file, "rb") as f_in:
with open(output_file, "wb") as f_out:
compressed_data = lzma2_compress(f_in.read(), format=format, preset=preset)
f_out.write(compressed_data)

def lzma2_decompress_file(compressed_file: Union[str, BinaryIO], decompressed_file: Union[str, BinaryIO]) -> None:
"""
Decompress a file compressed with LZMA2 algorithm.

:param compressed_file: The compressed file to decompress. Can be a file path or a binary file object.
:param decompressed_file: The output file to write the decompressed data. Can be a file path or a binary file object.
"""
with open(compressed_file, "rb") as f_in:
with open(decompressed_file, "wb") as f_out:
decompressed_data = lzma2_decompress(f_in.read())
f_out.write(decompressed_data)
75 changes: 75 additions & 0 deletions tests/test_compression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import py7zr
import shutil

def test_lzma2_compression():
# Test various input sizes
small_data = b"This is a small test string"
large_data = b"This is a large test string, repeated many times over. This is a large test string, repeated many times over."

# Verify compression and decompression
compressed_data = compress_lzma2(small_data)
decompressed_data = decompress_lzma2(compressed_data)
assert decompressed_data == small_data

compressed_data = compress_lzma2(large_data)
decompressed_data = decompress_lzma2(compressed_data)
assert decompressed_data == large_data

# Check compression ratio
compression_ratio = calculate_compression_ratio(small_data, compressed_data)
assert compression_ratio > 0.5, "Compression ratio for small data is too low"

compression_ratio = calculate_compression_ratio(large_data, compressed_data)
assert compression_ratio > 0.8, "Compression ratio for large data is too low"

# Validate error handling
with pytest.raises(py7zr.exceptions.ChecksumError):
decompress_lzma2(compressed_data[:-1])

with pytest.raises(TypeError):
compress_lzma2("This is invalid input")

# Test memory efficiency and resource usage
# (This is a best-effort test and may not be accurate on all systems)
memory_usage_before = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
compress_lzma2(large_data * 100)
memory_usage_after = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
assert memory_usage_after - memory_usage_before < 1e6, "Memory usage during compression is too high"

def compress_lzma2(data: bytes) -> bytes:
"""
Compress data using the LZMA2 algorithm.

:param data: The data to compress.
:return: The compressed data.
"""
with py7zr.SevenZipFile(mode="w", format="7z", compression_level=9) as z:
z.write("data.bin", data)
with open("data.7z", "rb") as f:
compressed_data = f.read()
shutil.os.remove("data.7z")
return compressed_data

def decompress_lzma2(compressed_data: bytes) -> bytes:
"""
Decompress data using the LZMA2 algorithm.

:param compressed_data: The compressed data.
:return: The decompressed data.
"""
with open("data.7z", "wb") as f:
f.write(compressed_data)
with py7zr.SevenZipFile("data.7z", mode="r") as z:
decompressed_data = z.read("data.bin")
shutil.os.remove("data.7z")
return decompressed_data

def calculate_compression_ratio(original_data: bytes, compressed_data: bytes) -> float:
"""
Calculate the compression ratio of compressed_data compared to original_data.

:param original_data: The original data.
:param compressed_data: The compressed data.
:return: The compression ratio as a float.
"""
return len(original_data) / len(compressed_data)