diff --git a/packs/core/core_utils.py b/packs/core/core_utils.py index 4bb9ea2..c33b90c 100644 --- a/packs/core/core_utils.py +++ b/packs/core/core_utils.py @@ -12,3 +12,19 @@ def check_test(file): return True else: return False + + +# THIS SHOULD BE MOVED ELSEWHERE +class MalformedHeaderError(Exception): + ''' + Header created for when two headers don't match up consecutively. + Created initially for WD1 processing, but should be back-ported for WD2 + ''' + + def __init__(self, header1, header2): + self.header1 = header1 + self.header2 = header2 + + def __str__(self): + return f"MalformedHeaderError: Headers don't output expected result. Ensure the .dat file provided is formatted correctly.\nFirst Header {self.header1}\nSecond Header {self.header2}" + diff --git a/packs/core/io.py b/packs/core/io.py index 6c7b2d0..99dbe3e 100644 --- a/packs/core/io.py +++ b/packs/core/io.py @@ -1,9 +1,18 @@ +import os + import pandas as pd +import numpy as np import h5py import ast import configparser +from contextlib import contextmanager + +from typing import Optional +from typing import Generator +from typing import Union +from typing import Tuple from packs.types import types @@ -87,6 +96,10 @@ def read_config_file(file_path : str) -> dict: # setup config parser config = configparser.ConfigParser() + if not os.path.exists(file_path): + raise FileNotFoundError(2, 'No such config file', file_path) + + # read in arguments, require the required ones config.read(file_path) arg_dict = {} @@ -99,3 +112,112 @@ def read_config_file(file_path : str) -> dict: arg_dict[key] = ast.literal_eval(config[section][key]) return arg_dict + + +@contextmanager +def writer(path : str, + group : str, + overwrite : Optional[bool] = True) -> Generator: + ''' + Outer function for a lazy h5 writer that will iteratively write to a dataset, with the formatting: + FILE.h5 -> GROUP/DATASET + Includes overwriting functionality, which will overwrite **GROUPS** at will if needed. + Parameters + ---------- + path (str) : File path + group (str) : Group within the h5 file + overwrite(bool) : Boolean for overwriting previous dataset (OPTIONAL) + + Returns + ------- + write (func) : write function described in write() + + + Fixed size is for when you know the size of the output file, so you set the size + of the df beforehand, saving precious IO operation. The input then becomes a tuple + of (True, DF_SIZE, INDEX), otherwise its false. + ''' + + + # open file if exists, create group or overwrite it + h5f = h5py.File(path, 'a') + try: + if overwrite: + if group in h5f: + del h5f[group] + + gr = h5f.require_group(group) + + def write(dataset : str, + data : np.ndarray, + fixed_size : Optional[Union[False, Tuple[True, int, int]]] = False) -> None: + ''' + Writes ndarray to dataset within group defined in writer(). + Fixed size used to speed up writing, if True will + create a dataset of a fixed size rather than + increasing the size iteratively. + + Parameters + ---------- + dataset (str) : Dataset name to write to + data (ndarray) : Data to write* + fixed_size (Union[Bool, Tuple[Bool, int, int]]) + : Method that's either enable or disabled. + False (disabled) -> Iteratively increases size of dataframe at runtime + True (enabled) -> Requires Tuple containing + (True, number of events, index to write to) + This method is best seen in action in `process_bin_WD1()`. + * Data should be in a numpy structured array format, as can be seen in WD1 and WD2 processing + ''' + if fixed_size is False: + # create dataset if doesnt exist, if does make larger + if dataset in gr: + dset = gr[dataset] + dset.resize((dset.shape[0] + 1, *dset.shape[1:])) + dset[-1] = data + else: + max_shape = (None,) + data.shape + dset = gr.require_dataset(dataset, shape = (1,) + data.shape, + maxshape = max_shape, dtype = data.dtype, + chunks = True) + dset[0] = data + else: + index = fixed_size[2] + # dataset of fixed size + if dataset in gr: + dset = gr[dataset] + else: + dset = gr.require_dataset(dataset, shape = (fixed_size[1],) + data.shape, + maxshape = fixed_size[1], dtype = data.dtype, + chunks = True) + dset[index] = data + + yield write + + finally: + h5f.close() + + +def reader(path : str, + group : str, + dataset : str) -> Generator: + ''' + A lazy h5 reader that will iteratively read from a dataset, with the formatting: + + FILE.H5 -> GROUP/DATASET + Parameters + ---------- + path (str) : File path + group (str) : Group name within the h5 file + dataset (str) : Dataset name within the group + Returns + ------- + row (generator) : Generator object that returns the next row from the dataset upon being called. + ''' + + with h5py.File(path, 'r') as h5f: + gr = h5f[group] + dset = gr[dataset] + + for row in dset: + yield row \ No newline at end of file diff --git a/packs/proc/proc.py b/packs/proc/proc.py index ec95240..6f4520c 100644 --- a/packs/proc/proc.py +++ b/packs/proc/proc.py @@ -2,6 +2,7 @@ from packs.core.io import read_config_file from packs.proc.processing_utils import process_bin_WD2 +from packs.proc.processing_utils import process_bin_WD1 from packs.core.core_utils import check_test def proc(config_file): @@ -22,6 +23,8 @@ def proc(config_file): case 'decode': if conf_dict['wavedump_edition'] == 2: process_bin_WD2(**arg_dict) + elif conf_dict['wavedump_edition'] == 1: + process_bin_WD1(**arg_dict) else: raise RuntimeError(f"wavedump edition {conf_dict['wavedump_edition']} decoding isn't currently implemented.") case default: diff --git a/packs/proc/processing_utils.py b/packs/proc/processing_utils.py index 7102b49..9b84360 100644 --- a/packs/proc/processing_utils.py +++ b/packs/proc/processing_utils.py @@ -14,6 +14,8 @@ # imports start from MULE/ from packs.core.core_utils import flatten +from packs.core.core_utils import MalformedHeaderError +from packs.core.io import writer from packs.types import types """ @@ -369,13 +371,130 @@ def check_save_path(save_path : str, return save_path +def process_event_lazy_WD1(file_object : BinaryIO, + sample_size : int): + + ''' + WAVEDUMP 1: Generator that outputs each event iteratively from an opened binary file + Parameters + ---------- + file_object (obj) : Opened file object + sample_size (int) : Time difference between each sample in waveform (2ns for V1730B digitiser) + Returns + ------- + data (generator) : Generator object containing one event's worth of data + across each event + ''' + + # read first header + header = np.fromfile(file_object, dtype = 'i', count = 6) + + # header to check against + sanity_header = header.copy() + + # continue only if data exists + while len(header) > 0: + + # alter header to match expected size + header[0] = header[0] - 24 + event_size = header[0] // sample_size + + # collect waveform, no of samples and timestamp + yield (np.fromfile(file_object, dtype = np.dtype(' sanity_header[5] # timestamp increases + ]): + sanity_header = None + else: + raise MalformedHeaderError(sanity_header, header) + else: + raise MalformedHeaderError(sanity_header, header) + print("Processing Finished!") + + +def process_bin_WD1(file_path : str, + save_path : str, + sample_size : int, + overwrite : Optional[bool] = False, + print_mod : Optional[int] = -1): + + ''' + WAVEDUMP 1: Takes a binary file and outputs the containing information in a h5 file. + This only works for individual channels at the moment, as wavedump 1 saves each channel + as a separate file. + For particularly large waveforms/number of events. You can 'chunk' the data such that + each dataset holds `counts` events. + # Makeup of the header (header[n]) where n is: + # 0 - event size (ns in our case, with extra 24 samples) + # 1 - board ID + # 2 - pattern (not sure exactly what this means) + # 3 - board channel + # 4 - event counter + # 5 - Time-tag for the trigger + # Each of which is a signed 4byte integer + Parameters + ---------- + file_path (str) : Path to binary file + save_path (str) : Path to saved file + sample_size (int) : Size of each sample in an event (2 ns in the case of V1730B digitiser) + overwrite (bool) : Boolean for overwriting pre-existing files + counts (int) : The number of events per chunks. -1 implies no chunking of data. + Returns + ------- + None + ''' + + + # lets build it here first and break it up later + # destroy the group within the file if you're overwriting + save_path = check_save_path(save_path, overwrite) + print(save_path) + + + # open file for reading + with open(file_path, 'rb') as file: + + # open writer object + with writer(save_path, 'RAW', overwrite) as write: + + for i, (waveform, samples, timestamp) in enumerate(process_event_lazy_WD1(file, sample_size)): + + if (i % print_mod == 0) and (print_mod != -1): + print(f"Event {i}") + + # enforce stucture upon data + e_dtype = types.event_info_type + wf_dtype = types.rwf_type_WD1(samples) + + event_info = np.array((i, timestamp, samples, sample_size, 1), dtype = e_dtype) + waveforms = np.array((i, 0, waveform), dtype = wf_dtype) + + # first run-through, collect the header information to extract table size + if i == 0: + file_size = os.path.getsize(file_path) + waveform_size = (samples * 2) + (4*6) + num_of_events = int(file_size / waveform_size) + + # add data to df lazily + write('event_info', event_info, (True, num_of_events, i)) + write('rwf', waveforms, (True, num_of_events, i)) + + def process_bin_WD2(file_path : str, save_path : str, overwrite : Optional[bool] = False, counts : Optional[int] = -1): ''' - Takes a binary file and outputs the containing waveform information in a h5 file. + WAVEDUMP 2: Takes a binary file and outputs the containing waveform information in a h5 file. For particularly large waveforms/number of events. You can 'chunk' the data such that each dataset holds `counts` events. diff --git a/packs/tests/data/configs/process_WD1.conf b/packs/tests/data/configs/process_WD1.conf new file mode 100644 index 0000000..b7025c9 --- /dev/null +++ b/packs/tests/data/configs/process_WD1.conf @@ -0,0 +1,12 @@ +[required] + +process = 'decode' +wavedump_edition = 1 +file_path = '/path/to/file.bin' +save_path = '/path/to/file.h5' + +[optional] + +overwrite = True +counts = -1 +print_mod = 100 \ No newline at end of file diff --git a/packs/tests/data/configs/process_WD1_1channel.conf b/packs/tests/data/configs/process_WD1_1channel.conf new file mode 100644 index 0000000..17f33a6 --- /dev/null +++ b/packs/tests/data/configs/process_WD1_1channel.conf @@ -0,0 +1,11 @@ +[required] +process = 'decode' +wavedump_edition = 1 +file_path = '/home/casper/Documents/MULE/packs/tests/data/one_channel_WD1.dat' +save_path = '/home/casper/Documents/MULE/packs/tests/data/one_channel_WD1_tmp.h5' +sample_size = 2 + +[optional] +overwrite = True +print_mod = 100 + diff --git a/packs/tests/data/one_channel_WD1.dat b/packs/tests/data/one_channel_WD1.dat new file mode 100644 index 0000000..2e2fd2a Binary files /dev/null and b/packs/tests/data/one_channel_WD1.dat differ diff --git a/packs/tests/data/one_channel_WD1.h5 b/packs/tests/data/one_channel_WD1.h5 new file mode 100644 index 0000000..19f4b5a Binary files /dev/null and b/packs/tests/data/one_channel_WD1.h5 differ diff --git a/packs/tests/io_test.py b/packs/tests/io_test.py new file mode 100644 index 0000000..4cd95f4 --- /dev/null +++ b/packs/tests/io_test.py @@ -0,0 +1,57 @@ +import os +import sys + +import numpy as np +import pandas as pd + +from pytest import mark +from pytest import raises +from pytest import warns + +from packs.core.io import read_config_file +from packs.core.io import reader +from packs.core.io import writer + +def test_missing_config(tmp_path, MULE_dir): + ''' + Simple test ensuring that when config file path is wrong, + MULE spits out a `FileNotFoundError` + ''' + + config_path = f'{tmp_path}/false_config.conf' + + with raises(FileNotFoundError): + read_config_file(config_path) + + +def test_reader_writer(tmp_path): + ''' + The simplest of tests, ensure the writer produces the expected output. + This test should be expanded to differing modifications when they are included + as fixtures. As this isn't the case yet, there is no need. + ''' + + file = tmp_path / 'writer_output_tmp.h5' + + test_dtype = np.dtype([ + ('int', int), + ('float', float), + ('bool', bool), + ('bigfloat', float), + ]) + + test_dataset = [np.array((0, 1.0, False, 25000.323232), dtype = test_dtype), + np.array((1, 4.0, True, 23456789.321), dtype = test_dtype)] + + # create the writer object + with writer(file, 'test_group', overwrite = True) as scribe: + # write something to it + scribe('test_dataset', test_dataset[0]) + scribe('test_dataset', test_dataset[1]) + + # read it out, should pop an a StopIteration error + scholar = reader(file, 'test_group', 'test_dataset') + with raises(StopIteration): + assert next(scholar).tolist() == test_dataset[0].tolist() + assert next(scholar).tolist() == test_dataset[1].tolist() + next(scholar) \ No newline at end of file diff --git a/packs/tests/processing_test.py b/packs/tests/processing_test.py index 57af0db..013e40c 100644 --- a/packs/tests/processing_test.py +++ b/packs/tests/processing_test.py @@ -1,3 +1,4 @@ +import os import sys import numpy as np @@ -11,6 +12,8 @@ from pytest import warns from pytest import fixture +from packs.proc.processing_utils import process_event_lazy_WD1 +from packs.proc.processing_utils import process_bin_WD1 from packs.proc.processing_utils import read_defaults_WD2 from packs.proc.processing_utils import process_header from packs.proc.processing_utils import read_binary @@ -22,8 +25,11 @@ from packs.types.types import rwf_type from packs.types.types import event_info_type +from packs.core.core_utils import MalformedHeaderError + from packs.core.io import load_rwf_info from packs.core.io import load_evt_info +from packs.core.io import reader from packs.types import types from hypothesis import given @@ -181,3 +187,53 @@ def test_decode_produces_expected_output(config, inpt, output, comparison, MULE_ assert load_evt_info(save_path).equals(load_evt_info(comparison_path)) assert load_rwf_info(save_path, samples).equals(load_rwf_info(comparison_path, samples)) + +@mark.parametrize("config, inpt, output, comparison", [("process_WD1_1channel.conf", "one_channel_WD1.dat", "one_channel_WD1_tmp.h5", "one_channel_WD1.h5")]) +def test_WD1_decode_produces_expected_output(config, inpt, output, comparison, MULE_dir, data_dir): + ''' + This test will be merged with test_decode_produces_expected_output() + once WD2 processing has been updated to match lazy method of WD1 + ''' + + # ensure path is correct + file_path = data_dir + inpt + save_path = data_dir + output + comparison_path = data_dir + comparison + config_path = data_dir + "configs/" + config + + # rewrite paths to files + cnfg = configparser.ConfigParser() + cnfg.read(config_path) + cnfg.set('required', 'file_path', "'" + file_path + "'") # need to add comments around for config reasons + cnfg.set('required', 'save_path', "'" + save_path + "'") + + with open(config_path, 'w') as cfgfile: + cnfg.write(cfgfile) + + # run processing pack decode + run_pack = ['python3', MULE_dir + "/bin/mule", "proc", config_path] + subprocess.run(run_pack) + + # the event info can be read out like a normal h5, the RWF cannot due to how they're structured + assert pd.read_hdf(save_path, 'RAW/event_info').equals(pd.read_hdf(comparison_path, 'RAW/event_info')) + assert [x for x in reader(save_path, 'RAW', 'rwf')] == [x for x in reader(comparison_path, 'RAW', 'rwf')] + + +def test_lazy_loading_malformed_data(MULE_dir): + ''' + Test that a file you pass through with no appropriate header is flagged if it's + not functioning correctly. + ATM the check for this is: + - event number goes up +1 events + - number of samples stays the same across two events + - timestamp increases between events + These may not always hold, but will ensure the test works as expected + ''' + + data_path = MULE_dir + "/packs/tests/data/malformed_data.bin" + + with raises(MalformedHeaderError): + with open(data_path, 'rb') as file: + a = process_event_lazy_WD1(file, sample_size = 2) + next(a) + next(a) \ No newline at end of file diff --git a/packs/types/types.py b/packs/types/types.py index 4722041..d859318 100644 --- a/packs/types/types.py +++ b/packs/types/types.py @@ -31,6 +31,15 @@ def rwf_type(samples : int) -> np.dtype: ('rwf', np.float32, (samples,)) ]) +def rwf_type_WD1(samples : int) -> np.dtype: + ''' + WAVEDUMP 1: Generates the data-type for raw waveforms + ''' + + return np.dtype([('event_number', int), + ('channels', int), + ('rwf', np.uint16, (samples))]) + def generate_wfdtype(channels, samples): '''