diff --git a/packs/core/io.py b/packs/core/io.py index ed593ce..99dbe3e 100644 --- a/packs/core/io.py +++ b/packs/core/io.py @@ -1,11 +1,18 @@ import os import pandas as pd +import numpy as np import h5py import ast import configparser +from contextlib import contextmanager + +from typing import Optional +from typing import Generator +from typing import Union +from typing import Tuple from packs.types import types @@ -105,3 +112,112 @@ def read_config_file(file_path : str) -> dict: arg_dict[key] = ast.literal_eval(config[section][key]) return arg_dict + + +@contextmanager +def writer(path : str, + group : str, + overwrite : Optional[bool] = True) -> Generator: + ''' + Outer function for a lazy h5 writer that will iteratively write to a dataset, with the formatting: + FILE.h5 -> GROUP/DATASET + Includes overwriting functionality, which will overwrite **GROUPS** at will if needed. + Parameters + ---------- + path (str) : File path + group (str) : Group within the h5 file + overwrite(bool) : Boolean for overwriting previous dataset (OPTIONAL) + + Returns + ------- + write (func) : write function described in write() + + + Fixed size is for when you know the size of the output file, so you set the size + of the df beforehand, saving precious IO operation. The input then becomes a tuple + of (True, DF_SIZE, INDEX), otherwise its false. + ''' + + + # open file if exists, create group or overwrite it + h5f = h5py.File(path, 'a') + try: + if overwrite: + if group in h5f: + del h5f[group] + + gr = h5f.require_group(group) + + def write(dataset : str, + data : np.ndarray, + fixed_size : Optional[Union[False, Tuple[True, int, int]]] = False) -> None: + ''' + Writes ndarray to dataset within group defined in writer(). + Fixed size used to speed up writing, if True will + create a dataset of a fixed size rather than + increasing the size iteratively. + + Parameters + ---------- + dataset (str) : Dataset name to write to + data (ndarray) : Data to write* + fixed_size (Union[Bool, Tuple[Bool, int, int]]) + : Method that's either enable or disabled. + False (disabled) -> Iteratively increases size of dataframe at runtime + True (enabled) -> Requires Tuple containing + (True, number of events, index to write to) + This method is best seen in action in `process_bin_WD1()`. + * Data should be in a numpy structured array format, as can be seen in WD1 and WD2 processing + ''' + if fixed_size is False: + # create dataset if doesnt exist, if does make larger + if dataset in gr: + dset = gr[dataset] + dset.resize((dset.shape[0] + 1, *dset.shape[1:])) + dset[-1] = data + else: + max_shape = (None,) + data.shape + dset = gr.require_dataset(dataset, shape = (1,) + data.shape, + maxshape = max_shape, dtype = data.dtype, + chunks = True) + dset[0] = data + else: + index = fixed_size[2] + # dataset of fixed size + if dataset in gr: + dset = gr[dataset] + else: + dset = gr.require_dataset(dataset, shape = (fixed_size[1],) + data.shape, + maxshape = fixed_size[1], dtype = data.dtype, + chunks = True) + dset[index] = data + + yield write + + finally: + h5f.close() + + +def reader(path : str, + group : str, + dataset : str) -> Generator: + ''' + A lazy h5 reader that will iteratively read from a dataset, with the formatting: + + FILE.H5 -> GROUP/DATASET + Parameters + ---------- + path (str) : File path + group (str) : Group name within the h5 file + dataset (str) : Dataset name within the group + Returns + ------- + row (generator) : Generator object that returns the next row from the dataset upon being called. + ''' + + with h5py.File(path, 'r') as h5f: + gr = h5f[group] + dset = gr[dataset] + + for row in dset: + yield row \ No newline at end of file diff --git a/packs/tests/data/overwriter_test.h5 b/packs/tests/data/overwriter_test.h5 new file mode 100644 index 0000000..358d412 Binary files /dev/null and b/packs/tests/data/overwriter_test.h5 differ diff --git a/packs/tests/io_test.py b/packs/tests/io_test.py index 7ef7aa6..7be9d04 100644 --- a/packs/tests/io_test.py +++ b/packs/tests/io_test.py @@ -9,7 +9,8 @@ from pytest import warns from packs.core.io import read_config_file - +from packs.core.io import reader +from packs.core.io import writer def test_missing_config(tmp_path, MULE_dir): ''' @@ -18,6 +19,227 @@ def test_missing_config(tmp_path, MULE_dir): ''' config_path = f'{tmp_path}/false_config.conf' - + with raises(FileNotFoundError): - read_config_file(config_path) \ No newline at end of file + read_config_file(config_path) + + +def test_reader_writer(tmp_path): + ''' + The simplest of tests, ensure the writer produces the expected output. + This test should be expanded to differing modifications when they are included + as fixtures. As this isn't the case yet, there is no need. + ''' + + file = tmp_path / 'writer_output_tmp.h5' + + test_dtype = np.dtype([ + ('int', int), + ('float', float), + ('bool', bool), + ('bigfloat', float), + ]) + + test_dataset = [np.array((0, 1.0, False, 25000.323232), dtype = test_dtype), + np.array((1, 4.0, True, 23456789.321), dtype = test_dtype)] + + # create the writer object + with writer(file, 'test_group', overwrite = True) as scribe: + # write something to it + scribe('test_dataset', test_dataset[0]) + scribe('test_dataset', test_dataset[1]) + + # read it out, should pop an a StopIteration error + scholar = reader(file, 'test_group', 'test_dataset') + with raises(StopIteration): + assert next(scholar).tolist() == test_dataset[0].tolist() + assert next(scholar).tolist() == test_dataset[1].tolist() + next(scholar) + + +def test_writer_overwriting(tmp_path, data_dir): + ''' + Testing the 'overwrite' flag works when true, + this test writes random data over the prior file and check + that the reader reads out exactly the same input, with no extra information + ''' + + file = f'{tmp_path}/overwriter_test.h5' + # generate junk data + test_dtype = np.dtype([ + ('int', int), + ('float', float), + ('bool', bool), + ('bigfloat', float), + ]) + + test_data = [np.array((0, 1.0, False, 25000.323232), dtype = test_dtype), + np.array((1, 4.0, True, 23456789.321), dtype = test_dtype)] + + overwrite_data = [np.array((1, 2.0, True, 30123.323232), dtype = test_dtype), + np.array((4, 5.0, False, 2.321), dtype = test_dtype)] + + # write file with junk data + with writer(file, 'RAW', overwrite = True) as scribe: + for data in test_data: + scribe('rwf', data) + initial_data = [] + # check the output is correct + for data in reader(file, 'RAW', 'rwf'): + initial_data.append(data) + + + # overwrite file with new junk data + with writer(file, 'RAW', overwrite = True) as scribe: + for data in overwrite_data: + scribe('rwf', data) + # readout of overwrite data + output_data = [] + for data in reader(file, 'RAW', 'rwf'): + output_data.append(data) + + + # ensure that these two lists arent identical + assert not np.array_equal(np.array(output_data), np.array(initial_data)) + # sanity check that the output is what you expect it is + assert np.array_equal(np.array(output_data), np.array(overwrite_data)) + + +def test_writer_not_overwriting(tmp_path, data_dir): + ''' + Testing 'overwrite' flag when false, + this test writes random data to the end of the prior file + and checks that the reader reads out both new and old data + ''' + file = f'{tmp_path}/overwriter_test_2.h5' + # generate junk data + test_dtype = np.dtype([ + ('int', int), + ('float', float), + ('bool', bool), + ('bigfloat', float), + ]) + + test_data = [np.array((0, 1.0, False, 25000.323232), dtype = test_dtype), + np.array((1, 4.0, True, 23456789.321), dtype = test_dtype)] + + append_data = [np.array((1, 2.0, True, 30123.323232), dtype = test_dtype), + np.array((4, 5.0, False, 2.321), dtype = test_dtype)] + + total_data = test_data + append_data + + # write file with junk data + with writer(file, 'RAW', overwrite = True) as scribe: + for data in test_data: + scribe('rwf', data) + initial_data = [] + # check the output is correct + for data in reader(file, 'RAW', 'rwf'): + initial_data.append(data) + + + # append file with new junk data + with writer(file, 'RAW', overwrite = False) as scribe: + for data in append_data: + scribe('rwf', data) + # readout of overwrite data + output_data = [] + for data in reader(file, 'RAW', 'rwf'): + output_data.append(data) + + + # ensure that these two lists are identical + assert np.array_equal(np.array(output_data), np.array(total_data)) + + + +def test_writer_fixed_size_correct_output(tmp_path): + ''' + provide a particular size of dataframe, see if it + returns as expected + ''' + + file = f'{tmp_path}/fixed_size_tester.h5' + + test_dtype = np.dtype([ + ('int', int), + ('float', float), + ('bool', bool), + ('bigfloat', float), + ]) + + + test_data = [np.array((0, 1.0, False, 25000.323232), dtype = test_dtype), + np.array((1, 4.0, True, 23456789.321), dtype = test_dtype), + np.array((1, 2.0, True, 30123.323232), dtype = test_dtype), + np.array((4, 5.0, False, 2.321), dtype = test_dtype)] + + # write the data + with writer(file, 'RAW', overwrite = True) as scribe: + for i, data in enumerate(test_data): + scribe('rwf', data, (True, len(test_data), i)) + + # read and check the data + for data_in, data_out in zip(test_data, reader(file, 'RAW', 'rwf')): + assert data_in == data_out + +def test_writer_fixed_size_provided_incorrectly(tmp_path): + ''' + if you provide a fixed size for the writer, then try and write to it + with more events than chosen + ''' + + file = f'{tmp_path}/fixed_size_tester_2.h5' + + test_dtype = np.dtype([ + ('int', int), + ('float', float), + ('bool', bool), + ('bigfloat', float), + ]) + + + test_data = [np.array((0, 1.0, False, 25000.323232), dtype = test_dtype), + np.array((1, 4.0, True, 23456789.321), dtype = test_dtype), + np.array((1, 2.0, True, 30123.323232), dtype = test_dtype), + np.array((4, 5.0, False, 2.321), dtype = test_dtype)] + + # expect IndexError when reading out + with raises(IndexError): + with writer(file, 'RAW', overwrite = True) as scribe: + for i, data in enumerate(test_data): + scribe('rwf', data, (True, len(test_data)-1, i)) + + +@mark.parametrize('over_input', (True, False)) +def test_writer_check_group_exists_no_overwrite(tmp_path, over_input): + ''' + generate an empty h5 file, and check that the + writer creates the groups as expected + ''' + # generate empty file + file = f'{tmp_path}/empty_file_tester.h5' + with open(file, 'w') as f: + pass + + test_dtype = np.dtype([ + ('int', int), + ('float', float), + ('bool', bool), + ('bigfloat', float), + ]) + + test_data = [np.array((0, 1.0, False, 25000.323232), dtype = test_dtype), + np.array((1, 4.0, True, 23456789.321), dtype = test_dtype)] + + # generate the groups as expected + with writer(file, 'RAW', overwrite = over_input) as scribe: + for data in test_data: + scribe('rwf', data) + + + # check output + for data_in, data_out in zip(test_data, reader(file, 'RAW', 'rwf')): + assert data_in == data_out + +