Skip to content

Add support for FLAC compression of 64bit integers. #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 46 additions & 12 deletions src/flacarray/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class FlacArray:
stream in the overall bytes array. The shape of the starting array corresponds
to the shape of the leading, un-compressed dimensions of the original array.

The input data is converted to 32bit integers. The "quanta" value is used
The input data is converted to 32bit or 64bit integers. The "quanta" value is used
for floating point data conversion and represents the floating point increment
for a single integer value. If quanta is None, each stream is scaled independently
based on its data range. If quanta is a scalar, all streams are scaled with the
Expand All @@ -42,20 +42,19 @@ class FlacArray:
The following rules specify the data conversion that is performed depending on
the input type:

* int32: No conversion.
* int32: No conversion. Compressed to single channel FLAC bytestream.

* int64: Subtract the integer closest to the mean, then truncate to lower
32 bits, and check that the higher bits were zero.
* int64: No conversion. Compressed to 2-channel (stereo) FLAC bytestream.

* float32: Subtract the mean and scale data based on the quanta value (see
above). Then round to nearest 32bit integer.

* float64: Subtract the mean and scale data based on the quanta value (see
above). Then round to nearest 32bit integer.
above). Then round to nearest 64bit integer.

After conversion to 32bit integers, each stream's data is separately compressed
into a sequence of FLAC bytes, which is appended to the bytestream. The offset in
bytes for each stream is recorded.
After conversion to integers, each stream's data is separately compressed into a
sequence of FLAC bytes, which is appended to the bytestream. The offset in bytes
for each stream is recorded.

A FlacArray is only constructed directly when making a copy. Use the class methods
to create FlacArrays from numpy arrays or on-disk representations.
Expand All @@ -71,6 +70,7 @@ def __init__(
shape=None,
global_shape=None,
compressed=None,
is_int64=None,
stream_starts=None,
stream_nbytes=None,
stream_offsets=None,
Expand All @@ -84,6 +84,7 @@ def __init__(
self._shape = copy.deepcopy(other._shape)
self._global_shape = copy.deepcopy(other._global_shape)
self._compressed = copy.deepcopy(other._compressed)
self._is_int64 = other._is_int64
self._stream_starts = copy.deepcopy(other._stream_starts)
self._stream_nbytes = copy.deepcopy(other._stream_nbytes)
self._stream_offsets = copy.deepcopy(other._stream_offsets)
Expand All @@ -97,6 +98,7 @@ def __init__(
self._shape = shape
self._global_shape = global_shape
self._compressed = compressed
self._is_int64 = is_int64
self._stream_starts = stream_starts
self._stream_nbytes = stream_nbytes
self._stream_offsets = stream_offsets
Expand Down Expand Up @@ -124,18 +126,25 @@ def _init_params(self):
if self._stream_offsets is not None:
if self._stream_gains is not None:
# This is floating point data
if self._stream_gains.dtype == np.dtype(np.float64):
if self._is_int64:
self._typestr = "float64"
else:
self._typestr = "float32"
else:
# This is int64 data
self._typestr = "int64"
raise RuntimeError("Offsets and gains must both be None or not None")
else:
self._typestr = "int32"
if self._is_int64:
self._typestr = "int64"
else:
self._typestr = "int32"

# Shapes of decompressed array

@property
def typestr(self):
"""A string representation of the original data type."""
return self._typestr

@property
def shape(self):
"""The shape of the local, uncompressed array."""
Expand Down Expand Up @@ -288,6 +297,7 @@ def __getitem__(self, key):
keep=keep,
first_stream_sample=first,
last_stream_sample=last,
is_int64=self._is_int64,
)
return arr

Expand All @@ -313,6 +323,9 @@ def __eq__(self, other):
if self._shape != other._shape:
log.debug(f"other shape {other._shape} != {self._shape}")
return False
if self._typestr != other._typestr:
log.debug(f"other typestr {other._typestr} != {self._typestr}")
return False
if self._global_shape != other._global_shape:
msg = f"other global_shape {other._global_shape} != {self._global_shape}"
log.debug(msg)
Expand Down Expand Up @@ -411,6 +424,7 @@ def to_array(
first_stream_sample=first_samp,
last_stream_sample=last_samp,
use_threads=use_threads,
is_int64=self._is_int64,
)
if keep is not None and keep_indices:
return (arr, indices)
Expand Down Expand Up @@ -447,6 +461,11 @@ def from_array(
global_shape = global_props["shape"]
mpi_dist = global_props["dist"]

if arr.dtype == np.dtype(np.int64) or arr.dtype == np.dtype(np.float64):
is_int64 = True
else:
is_int64 = False

# Compress our local piece of the array
compressed, starts, nbytes, offsets, gains = array_compress(
arr,
Expand All @@ -461,6 +480,7 @@ def from_array(
shape=arr.shape,
global_shape=global_shape,
compressed=compressed,
is_int64=is_int64,
stream_starts=starts,
stream_nbytes=nbytes,
stream_offsets=offsets,
Expand Down Expand Up @@ -489,6 +509,10 @@ def write_hdf5(self, hgrp):
None

"""
if self._is_int64:
n_channels = 2
else:
n_channels = 1
hdf5_write_compressed(
hgrp,
self._leading_shape,
Expand All @@ -500,6 +524,7 @@ def write_hdf5(self, hgrp):
self._stream_offsets,
self._stream_gains,
self._compressed,
n_channels,
self._compressed.nbytes,
self._global_nbytes,
self._global_proc_nbytes,
Expand Down Expand Up @@ -551,6 +576,7 @@ def read_hdf5(
local_shape,
global_shape,
compressed,
n_channels,
stream_starts,
stream_nbytes,
stream_offsets,
Expand All @@ -569,6 +595,7 @@ def read_hdf5(
shape=local_shape,
global_shape=global_shape,
compressed=compressed,
is_int64=(n_channels == 2),
stream_starts=stream_starts,
stream_nbytes=stream_nbytes,
stream_offsets=stream_offsets,
Expand All @@ -593,6 +620,10 @@ def write_zarr(self, zgrp):
None

"""
if self._is_int64:
n_channels = 2
else:
n_channels = 1
zarr_write_compressed(
zgrp,
self._leading_shape,
Expand All @@ -604,6 +635,7 @@ def write_zarr(self, zgrp):
self._stream_offsets,
self._stream_gains,
self._compressed,
n_channels,
self._compressed.nbytes,
self._global_nbytes,
self._global_proc_nbytes,
Expand Down Expand Up @@ -653,6 +685,7 @@ def read_zarr(
local_shape,
global_shape,
compressed,
n_channels,
stream_starts,
stream_nbytes,
stream_offsets,
Expand All @@ -671,6 +704,7 @@ def read_zarr(
shape=local_shape,
global_shape=global_shape,
compressed=compressed,
is_int64=(n_channels == 2),
stream_starts=stream_starts,
stream_nbytes=stream_nbytes,
stream_offsets=stream_offsets,
Expand Down
21 changes: 8 additions & 13 deletions src/flacarray/compress.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@
import numpy as np

from .libflacarray import encode_flac
from .utils import int64_to_int32, float_to_int32, function_timer
from .utils import float_to_int, function_timer


@function_timer
def array_compress(arr, level=5, quanta=None, precision=None, use_threads=False):
"""Compress a numpy array with optional floating point conversion.

If `arr` is an int32 array, the returned stream offsets and gains will be None.
if `arr` is an int64 array, the stream offsets will be the integer value subtracted
when converting to int32. Both float32 and float64 data will have floating point
offset and gain arrays returned.
if `arr` is an int64 array, the returned stream offsets and gains will be None and
the calling code is responsible for tracking that the compressed bytes are
associated with a 64bit stream. Both float32 and float64 data will have floating
point offset and gain arrays returned.

Args:
arr (numpy.ndarray): The input array data.
Expand Down Expand Up @@ -55,17 +56,11 @@ def array_compress(arr, level=5, quanta=None, precision=None, use_threads=False)
else:
dquanta = None

if arr.dtype == np.dtype(np.int32):
if arr.dtype == np.dtype(np.int32) or arr.dtype == np.dtype(np.int64):
(compressed, starts, nbytes) = encode_flac(arr, level, use_threads=use_threads)
return (compressed, starts, nbytes, None, None)
elif arr.dtype == np.dtype(np.int64):
idata, ioff = int64_to_int32(arr)
(compressed, starts, nbytes) = encode_flac(
idata, level, use_threads=use_threads
)
return (compressed, starts, nbytes, ioff, None)
elif arr.dtype == np.dtype(np.float64) or arr.dtype == np.dtype(np.float32):
idata, foff, gains = float_to_int32(arr, quanta=dquanta, precision=precision)
elif arr.dtype == np.dtype(np.float32) or arr.dtype == np.dtype(np.float64):
idata, foff, gains = float_to_int(arr, quanta=dquanta, precision=precision)
(compressed, starts, nbytes) = encode_flac(
idata, level, use_threads=use_threads
)
Expand Down
26 changes: 12 additions & 14 deletions src/flacarray/decompress.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import numpy as np

from .libflacarray import decode_flac
from .utils import int32_to_float, keep_select, function_timer, select_keep_indices
from .utils import int_to_float, keep_select, function_timer, select_keep_indices


@function_timer
Expand All @@ -19,6 +19,7 @@ def array_decompress_slice(
keep=None,
first_stream_sample=None,
last_stream_sample=None,
is_int64=False,
use_threads=False,
):
"""Decompress a slice of a FLAC encoded array and restore original data type.
Expand Down Expand Up @@ -52,6 +53,7 @@ def array_decompress_slice(
keep (array): Bool array of streams to keep in the decompression.
first_stream_sample (int): The first sample of every stream to decompress.
last_stream_sample (int): The last sample of every stream to decompress.
is_int64 (bool): If True, the compressed stream contains 64bit integers.
use_threads (bool): If True, use OpenMP threads to parallelize decoding.
This is only beneficial for large arrays.

Expand Down Expand Up @@ -79,27 +81,19 @@ def array_decompress_slice(
first_sample=first_stream_sample,
last_sample=last_stream_sample,
use_threads=use_threads,
is_int64=is_int64,
)
arr = int32_to_float(idata, offsets, gains)
arr = int_to_float(idata, offsets, gains)
else:
# This is int64 data
idata = decode_flac(
compressed,
starts,
nbytes,
stream_size,
first_sample=first_stream_sample,
last_sample=last_stream_sample,
use_threads=use_threads,
raise RuntimeError(
"When specifying offsets, you must also provide the gains"
)
ext_shape = offsets.shape + (1,)
arr = idata.astype(np.int64) + offsets.reshape(ext_shape)
else:
if stream_gains is not None:
raise RuntimeError(
"When specifying gains, you must also provide the offsets"
)
# This is int32 data
# This is integer data
arr = decode_flac(
compressed,
starts,
Expand All @@ -108,6 +102,7 @@ def array_decompress_slice(
first_sample=first_stream_sample,
last_sample=last_stream_sample,
use_threads=use_threads,
is_int64=is_int64,
)
return (arr, indices)

Expand All @@ -122,6 +117,7 @@ def array_decompress(
stream_gains=None,
first_stream_sample=None,
last_stream_sample=None,
is_int64=False,
use_threads=False,
):
"""Decompress a FLAC encoded array and restore original data type.
Expand All @@ -144,6 +140,7 @@ def array_decompress(
stream_gains (array): The array of gains, one per stream.
first_stream_sample (int): The first sample of every stream to decompress.
last_stream_sample (int): The last sample of every stream to decompress.
is_int64 (bool): If True, the compressed stream contains 64bit integers.
use_threads (bool): If True, use OpenMP threads to parallelize decoding.
This is only beneficial for large arrays.

Expand All @@ -161,6 +158,7 @@ def array_decompress(
keep=None,
first_stream_sample=first_stream_sample,
last_stream_sample=last_stream_sample,
is_int64=is_int64,
use_threads=use_threads,
)
return arr
Loading
Loading