-
-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathucodecs.py
66 lines (51 loc) · 2.05 KB
/
ucodecs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#######################################################################
# Copyright (c) 2019-present, Blosc Development Team <[email protected]>
# All rights reserved.
#
# This source code is licensed under a BSD-style license (found in the
# LICENSE file in the root directory of this source tree)
#######################################################################
# This shows how to implement an user defined codec in pure Python
import sys
import numpy as np
import blosc2
nchunks = 2
chunk_len = 20 * 1000
dtype = np.dtype(np.int32)
# Define encoder and decoder functions
def encoder1(input, output, meta, schunk):
# Check whether the data is an arange
nd_input = input.view(dtype)
step = int(nd_input[1] - nd_input[0])
res = nd_input[1:] - nd_input[:-1]
if np.min(res) == np.max(res):
output[0:4] = input[0:4] # start
n = step.to_bytes(4, sys.byteorder)
output[4:8] = [n[i] for i in range(4)]
return 8
else:
# Not compressible, tell Blosc2 to do a memcpy
return 0
def decoder1(input, output, meta, schunk):
# For decoding we only have to worry about the arange case
# (other cases are handled by Blosc2)
nd_input = input.view(dtype)
nd_output = output.view(dtype)
nd_output[:] = [nd_input[0] + i * nd_input[1] for i in range(nd_output.size)]
return nd_output.size * schunk.typesize
# Register codec
codec_name = "codec"
id = 180
blosc2.register_codec(codec_name, id, encoder1, decoder1)
# Set the compression and decompression parameters
cparams = blosc2.CParams(
typesize=dtype.itemsize, codec=id, nthreads=1, filters=[blosc2.Filter.NOFILTER], filters_meta=[0]
)
dparams = blosc2.DParams(nthreads=1)
# Create SChunk and fill it with data
data = np.arange(0, chunk_len * nchunks, 1, dtype=dtype)
schunk = blosc2.SChunk(chunksize=chunk_len * dtype.itemsize, data=data, cparams=cparams, dparams=dparams)
# Check data can be decompressed correctly
out = np.empty(chunk_len * nchunks, dtype=dtype)
schunk.get_slice(0, chunk_len * nchunks, out=out)
assert np.array_equal(data, out)