Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions packs/core/io.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
import os

import pandas as pd
import numpy as np

import h5py
import ast
import configparser

from contextlib import contextmanager

from typing import Optional
from typing import Generator
from typing import Union
from typing import Tuple

from packs.types import types

Expand Down Expand Up @@ -105,3 +112,112 @@ def read_config_file(file_path : str) -> dict:
arg_dict[key] = ast.literal_eval(config[section][key])

return arg_dict


@contextmanager
def writer(path : str,
group : str,
overwrite : Optional[bool] = True) -> Generator:
'''
Outer function for a lazy h5 writer that will iteratively write to a dataset, with the formatting:
FILE.h5 -> GROUP/DATASET
Includes overwriting functionality, which will overwrite **GROUPS** at will if needed.
Parameters
----------
path (str) : File path
group (str) : Group within the h5 file
overwrite(bool) : Boolean for overwriting previous dataset (OPTIONAL)

Returns
-------
write (func) : write function described in write()


Fixed size is for when you know the size of the output file, so you set the size
of the df beforehand, saving precious IO operation. The input then becomes a tuple
of (True, DF_SIZE, INDEX), otherwise its false.
'''


# open file if exists, create group or overwrite it
h5f = h5py.File(path, 'a')
try:
if overwrite:
if group in h5f:
del h5f[group]

gr = h5f.require_group(group)

def write(dataset : str,
data : np.ndarray,
fixed_size : Optional[Union[False, Tuple[True, int, int]]] = False) -> None:
'''
Writes ndarray to dataset within group defined in writer().
Fixed size used to speed up writing, if True will
create a dataset of a fixed size rather than
increasing the size iteratively.

Parameters
----------
dataset (str) : Dataset name to write to
data (ndarray) : Data to write*
fixed_size (Union[Bool, Tuple[Bool, int, int]])
: Method that's either enable or disabled.
False (disabled) -> Iteratively increases size of dataframe at runtime
True (enabled) -> Requires Tuple containing
(True, number of events, index to write to)
This method is best seen in action in `process_bin_WD1()`.
* Data should be in a numpy structured array format, as can be seen in WD1 and WD2 processing
'''
if fixed_size is False:
# create dataset if doesnt exist, if does make larger
if dataset in gr:
dset = gr[dataset]
dset.resize((dset.shape[0] + 1, *dset.shape[1:]))
dset[-1] = data
else:
max_shape = (None,) + data.shape
dset = gr.require_dataset(dataset, shape = (1,) + data.shape,
maxshape = max_shape, dtype = data.dtype,
chunks = True)
dset[0] = data
else:
index = fixed_size[2]
# dataset of fixed size
if dataset in gr:
dset = gr[dataset]
else:
dset = gr.require_dataset(dataset, shape = (fixed_size[1],) + data.shape,
maxshape = fixed_size[1], dtype = data.dtype,
chunks = True)
dset[index] = data

yield write

finally:
h5f.close()


def reader(path : str,
group : str,
dataset : str) -> Generator:
'''
A lazy h5 reader that will iteratively read from a dataset, with the formatting:

FILE.H5 -> GROUP/DATASET
Parameters
----------
path (str) : File path
group (str) : Group name within the h5 file
dataset (str) : Dataset name within the group
Returns
-------
row (generator) : Generator object that returns the next row from the dataset upon being called.
'''

with h5py.File(path, 'r') as h5f:
gr = h5f[group]
dset = gr[dataset]

for row in dset:
yield row
Binary file added packs/tests/data/overwriter_test.h5
Binary file not shown.
228 changes: 225 additions & 3 deletions packs/tests/io_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
from pytest import warns

from packs.core.io import read_config_file

from packs.core.io import reader
from packs.core.io import writer

def test_missing_config(tmp_path, MULE_dir):
'''
Expand All @@ -18,6 +19,227 @@ def test_missing_config(tmp_path, MULE_dir):
'''

config_path = f'{tmp_path}/false_config.conf'

with raises(FileNotFoundError):
read_config_file(config_path)
read_config_file(config_path)


def test_reader_writer(tmp_path):
'''
The simplest of tests, ensure the writer produces the expected output.
This test should be expanded to differing modifications when they are included
as fixtures. As this isn't the case yet, there is no need.
'''

file = tmp_path / 'writer_output_tmp.h5'

test_dtype = np.dtype([
('int', int),
('float', float),
('bool', bool),
('bigfloat', float),
])

test_dataset = [np.array((0, 1.0, False, 25000.323232), dtype = test_dtype),
np.array((1, 4.0, True, 23456789.321), dtype = test_dtype)]

# create the writer object
with writer(file, 'test_group', overwrite = True) as scribe:
# write something to it
scribe('test_dataset', test_dataset[0])
scribe('test_dataset', test_dataset[1])

# read it out, should pop an a StopIteration error
scholar = reader(file, 'test_group', 'test_dataset')
with raises(StopIteration):
assert next(scholar).tolist() == test_dataset[0].tolist()
assert next(scholar).tolist() == test_dataset[1].tolist()
next(scholar)


def test_writer_overwriting(tmp_path, data_dir):
'''
Testing the 'overwrite' flag works when true,
this test writes random data over the prior file and check
that the reader reads out exactly the same input, with no extra information
'''

file = f'{tmp_path}/overwriter_test.h5'
# generate junk data
test_dtype = np.dtype([
('int', int),
('float', float),
('bool', bool),
('bigfloat', float),
])

test_data = [np.array((0, 1.0, False, 25000.323232), dtype = test_dtype),
np.array((1, 4.0, True, 23456789.321), dtype = test_dtype)]

overwrite_data = [np.array((1, 2.0, True, 30123.323232), dtype = test_dtype),
np.array((4, 5.0, False, 2.321), dtype = test_dtype)]

# write file with junk data
with writer(file, 'RAW', overwrite = True) as scribe:
for data in test_data:
scribe('rwf', data)
initial_data = []
# check the output is correct
for data in reader(file, 'RAW', 'rwf'):
initial_data.append(data)


# overwrite file with new junk data
with writer(file, 'RAW', overwrite = True) as scribe:
for data in overwrite_data:
scribe('rwf', data)
# readout of overwrite data
output_data = []
for data in reader(file, 'RAW', 'rwf'):
output_data.append(data)


# ensure that these two lists arent identical
assert not np.array_equal(np.array(output_data), np.array(initial_data))
# sanity check that the output is what you expect it is
assert np.array_equal(np.array(output_data), np.array(overwrite_data))


def test_writer_not_overwriting(tmp_path, data_dir):
'''
Testing 'overwrite' flag when false,
this test writes random data to the end of the prior file
and checks that the reader reads out both new and old data
'''
file = f'{tmp_path}/overwriter_test_2.h5'
# generate junk data
test_dtype = np.dtype([
('int', int),
('float', float),
('bool', bool),
('bigfloat', float),
])

test_data = [np.array((0, 1.0, False, 25000.323232), dtype = test_dtype),
np.array((1, 4.0, True, 23456789.321), dtype = test_dtype)]

append_data = [np.array((1, 2.0, True, 30123.323232), dtype = test_dtype),
np.array((4, 5.0, False, 2.321), dtype = test_dtype)]

total_data = test_data + append_data

# write file with junk data
with writer(file, 'RAW', overwrite = True) as scribe:
for data in test_data:
scribe('rwf', data)
initial_data = []
# check the output is correct
for data in reader(file, 'RAW', 'rwf'):
initial_data.append(data)


# append file with new junk data
with writer(file, 'RAW', overwrite = False) as scribe:
for data in append_data:
scribe('rwf', data)
# readout of overwrite data
output_data = []
for data in reader(file, 'RAW', 'rwf'):
output_data.append(data)


# ensure that these two lists are identical
assert np.array_equal(np.array(output_data), np.array(total_data))



def test_writer_fixed_size_correct_output(tmp_path):
'''
provide a particular size of dataframe, see if it
returns as expected
'''

file = f'{tmp_path}/fixed_size_tester.h5'

test_dtype = np.dtype([
('int', int),
('float', float),
('bool', bool),
('bigfloat', float),
])


test_data = [np.array((0, 1.0, False, 25000.323232), dtype = test_dtype),
np.array((1, 4.0, True, 23456789.321), dtype = test_dtype),
np.array((1, 2.0, True, 30123.323232), dtype = test_dtype),
np.array((4, 5.0, False, 2.321), dtype = test_dtype)]

# write the data
with writer(file, 'RAW', overwrite = True) as scribe:
for i, data in enumerate(test_data):
scribe('rwf', data, (True, len(test_data), i))

# read and check the data
for data_in, data_out in zip(test_data, reader(file, 'RAW', 'rwf')):
assert data_in == data_out

def test_writer_fixed_size_provided_incorrectly(tmp_path):
'''
if you provide a fixed size for the writer, then try and write to it
with more events than chosen
'''

file = f'{tmp_path}/fixed_size_tester_2.h5'

test_dtype = np.dtype([
('int', int),
('float', float),
('bool', bool),
('bigfloat', float),
])


test_data = [np.array((0, 1.0, False, 25000.323232), dtype = test_dtype),
np.array((1, 4.0, True, 23456789.321), dtype = test_dtype),
np.array((1, 2.0, True, 30123.323232), dtype = test_dtype),
np.array((4, 5.0, False, 2.321), dtype = test_dtype)]

# expect IndexError when reading out
with raises(IndexError):
with writer(file, 'RAW', overwrite = True) as scribe:
for i, data in enumerate(test_data):
scribe('rwf', data, (True, len(test_data)-1, i))


@mark.parametrize('over_input', (True, False))
def test_writer_check_group_exists_no_overwrite(tmp_path, over_input):
'''
generate an empty h5 file, and check that the
writer creates the groups as expected
'''
# generate empty file
file = f'{tmp_path}/empty_file_tester.h5'
with open(file, 'w') as f:
pass

test_dtype = np.dtype([
('int', int),
('float', float),
('bool', bool),
('bigfloat', float),
])

test_data = [np.array((0, 1.0, False, 25000.323232), dtype = test_dtype),
np.array((1, 4.0, True, 23456789.321), dtype = test_dtype)]

# generate the groups as expected
with writer(file, 'RAW', overwrite = over_input) as scribe:
for data in test_data:
scribe('rwf', data)


# check output
for data_in, data_out in zip(test_data, reader(file, 'RAW', 'rwf')):
assert data_in == data_out