diff --git a/packs/configs/process_WD1.conf b/packs/configs/process_WD1.conf new file mode 100644 index 0000000..cfbb8e9 --- /dev/null +++ b/packs/configs/process_WD1.conf @@ -0,0 +1,12 @@ +[required] + +process = 'decode' +wavedump_edition = 1 +file_path = '/path/to/file.bin' +save_path = '/path/to/file.h5' + +[optional] + +overwrite = True +counts = -1 +print_mod = 100 diff --git a/packs/core/core_utils.py b/packs/core/core_utils.py index 4bb9ea2..187f129 100644 --- a/packs/core/core_utils.py +++ b/packs/core/core_utils.py @@ -12,3 +12,19 @@ def check_test(file): return True else: return False + +# THIS SHOULD BE MOVED ELSEWHERE +class MalformedHeaderError(Exception): + ''' + Header created for when two headers don't match up consecutively. + + Created initially for WD1 processing, but should be back-ported for WD2 + ''' + + def __init__(self, header1, header2): + self.header1 = header1 + self.header2 = header2 + + def __str__(self): + return f"MalformedHeaderError: Headers don't output expected result. Ensure the .dat file provided is formatted correctly.\nFirst Header {self.header1}\nSecond Header {self.header2}" + diff --git a/packs/core/io.py b/packs/core/io.py index 6c7b2d0..8099ceb 100644 --- a/packs/core/io.py +++ b/packs/core/io.py @@ -1,9 +1,18 @@ import pandas as pd +import numpy as np import h5py import ast import configparser +from contextlib import contextmanager + +from typing import Optional +from typing import Generator +from typing import Union +from typing import Tuple + +from functools import partial from packs.types import types @@ -99,3 +108,123 @@ def read_config_file(file_path : str) -> dict: arg_dict[key] = ast.literal_eval(config[section][key]) return arg_dict + + +@contextmanager +def writer(path : str, + group : str, + overwrite : Optional[bool] = True) -> Generator: + ''' + Outer function for a lazy h5 writer that will iteratively write to a dataset, with the formatting: + + FILE.h5 -> GROUP/DATASET + + Includes overwriting functionality, which will overwrite **GROUPS** at will if needed. + + Parameters + ---------- + + path (str) : File path + group (str) : Group within the h5 file + overwrite(bool) : Boolean for overwriting previous dataset (OPTIONAL) + + Returns + ------- + + write (func) : write function described in write() + + + Fixed size is for when you know the size of the output file, so you set the size + of the df beforehand, saving precious IO operation. The input then becomes a tuple + of (True, DF_SIZE, INDEX), otherwise its false. + ''' + + + # open file if exists, create group or overwrite it + h5f = h5py.File(path, 'a') + try: + if overwrite: + if group in h5f: + del h5f[group] + + gr = h5f.require_group(group) + + def write(dataset : str, + data : np.ndarray, + fixed_size : Optional[Union[False, Tuple[True, int, int]]] = False) -> None: + ''' + Writes ndarray to dataset within group defined in writer(). + + Fixed size used to speed up writing, if True will + create a dataset of a fixed size rather than + increasing the size iteratively. + + Parameters + ---------- + + dataset (str) : Dataset name to write to + data (ndarray) : Data to write* + fixed_size (Union[Bool, Tuple[Bool, int, int]]) + : Method that's either enable or disabled. + False (disabled) -> Iteratively increases size of dataframe at runtime + True (enabled) -> Requires Tuple containing + (True, number of events, index to write to) + This method is best seen in action in `process_bin_WD1()`. + + * Data should be in a numpy structured array format, as can be seen in WD1 and WD2 processing + ''' + if fixed_size is False: + # create dataset if doesnt exist, if does make larger + if dataset in gr: + dset = gr[dataset] + dset.resize((dset.shape[0] + 1, *dset.shape[1:])) + dset[-1] = data + else: + max_shape = (None,) + data.shape + dset = gr.require_dataset(dataset, shape = (1,) + data.shape, + maxshape = max_shape, dtype = data.dtype, + chunks = True) + dset[0] = data + else: + index = fixed_size[2] + # dataset of fixed size + if dataset in gr: + dset = gr[dataset] + else: + dset = gr.require_dataset(dataset, shape = (fixed_size[1],) + data.shape, + maxshape = fixed_size[1], dtype = data.dtype, + chunks = True) + dset[index] = data + + yield write + + finally: + h5f.close() + +def reader(path : str, + group : str, + dataset : str) -> Generator: + ''' + A lazy h5 reader that will iteratively read from a dataset, with the formatting: + + FILE.H5 -> GROUP/DATASET + + Parameters + ---------- + + path (str) : File path + group (str) : Group name within the h5 file + dataset (str) : Dataset name within the group + + Returns + ------- + + row (generator) : Generator object that returns the next row from the dataset upon being called. + ''' + + with h5py.File(path, 'r') as h5f: + gr = h5f[group] + dset = gr[dataset] + + for row in dset: + yield row diff --git a/packs/proc/proc.py b/packs/proc/proc.py index ec95240..7c49111 100644 --- a/packs/proc/proc.py +++ b/packs/proc/proc.py @@ -1,7 +1,7 @@ import os from packs.core.io import read_config_file -from packs.proc.processing_utils import process_bin_WD2 +from packs.proc.processing_utils import process_bin_WD2, process_bin_WD1 from packs.core.core_utils import check_test def proc(config_file): @@ -22,6 +22,8 @@ def proc(config_file): case 'decode': if conf_dict['wavedump_edition'] == 2: process_bin_WD2(**arg_dict) + elif conf_dict['wavedump_edition'] == 1: + process_bin_WD1(**arg_dict) else: raise RuntimeError(f"wavedump edition {conf_dict['wavedump_edition']} decoding isn't currently implemented.") case default: diff --git a/packs/proc/processing_utils.py b/packs/proc/processing_utils.py index 7102b49..5a29321 100644 --- a/packs/proc/processing_utils.py +++ b/packs/proc/processing_utils.py @@ -13,7 +13,8 @@ from typing import Optional # imports start from MULE/ -from packs.core.core_utils import flatten +from packs.core.core_utils import MalformedHeaderError, flatten +from packs.core.io import writer from packs.types import types """ @@ -24,95 +25,6 @@ """ - - -def raw_to_h5_WD1(PATH, save_h5 = False, verbose = False, print_mod = 0): - ''' - **UNTESTED/DEPRECATED. BE AWARE THIS FUNCTION MAY NOT WORK AS DESIRED** - - Takes binary files data files (.dat) produced using Wavedump 1 - and decodes them into waveforms, that are then inserted into - pandas dataframes. - - These dataframes can then be saved as h5 files for further use. - - Parameters - ---------- - - PATH (str) : File path of interest - save_h5 (bool) : Flag for saving data - verbose (bool) : Flag for outputting information - print_mod (int) : Print modifier - - Returns - ------- - - data (int 2D array) : 2D array of events - First element defines event - Second element defines ADC value - ''' - - # Makeup of the header (array[n]) where n is: - # 0 - event size (ns in our case, with extra 24 samples) - # 1 - board ID - # 2 - pattern (not sure exactly what this means) - # 3 - board channel - # 4 - event counter - # 5 - Time-tag for the trigger - - # Output data is a collection of ints defined in size - # by (event size - 24) // 2 - - file = open(PATH, 'rb') - data = [] - - print("File open! Processing...") - # Collect data, while true loops are always dangerous but lets ignore that here :) - while (True): - - # take the header information from the file (first 6 elements) - array = np.fromfile(file, dtype='i', count=6) - - # breaking condition - if len(array) == 0: - print("Processing finished! Saving...") - break - - # printing events - if (array[4] % int(print_mod) == 0): - print("Event {}".format(array[4])) - - # verbose check - if (verbose == True): - array_tag = ['event size (ns)', 'board ID', 'pattern', 'board channel', 'event counter', 'trigger tag'] - for i in range(len(array)): - print("{}: {}".format(array_tag[i], array[i])) - - - - # alter event size to the samples - array[0] = array[0] - 24 - - # collect event - event_size = array[0] // 2 - - int16bit = np.dtype(' (int, int, int, int): ''' @@ -292,6 +204,7 @@ def format_wfs(data : np.ndarray, return event_information, waveform + def save_data(event_information : np.ndarray, rwf : np.ndarray, save_path : str, @@ -338,7 +251,7 @@ def save_data(event_information : np.ndarray, def check_save_path(save_path : str, - overwrite : bool): + overwrite : Optional[bool] = True): ''' Checks that the save_path is valid/doesn't already exist and if it does, other `overwrite` it or create an additional file with a number added. @@ -369,13 +282,140 @@ def check_save_path(save_path : str, return save_path +def process_event_lazy_WD1(file_object : BinaryIO, + sample_size : int): + + ''' + WAVEDUMP 1: Generator that outputs each event iteratively from an opened binary file + + Parameters + ---------- + + file_object (obj) : Opened file object + sample_size (int) : Time difference between each sample in waveform (2ns for V1730B digitiser) + + Returns + ------- + data (generator) : Generator object containing one event's worth of data + across each event + ''' + + # read first header + header = np.fromfile(file_object, dtype = 'i', count = 6) + + # header to check against + sanity_header = header.copy() + + # continue only if data exists + while len(header) > 0: + + # alter header to match expected size + header[0] = header[0] - 24 + event_size = header[0] // sample_size + + # collect waveform, no of samples and timestamp + yield (np.fromfile(file_object, dtype = np.dtype(' sanity_header[5] # timestamp increases + ]): + sanity_header = None + else: + raise MalformedHeaderError(sanity_header, header) + else: + raise MalformedHeaderError(sanity_header, header) + print("Processing Finished!") + + +def process_bin_WD1(file_path : str, + save_path : str, + sample_size : int, + overwrite : Optional[bool] = False, + print_mod : Optional[int] = -1): + + ''' + WAVEDUMP 1: Takes a binary file and outputs the containing information in a h5 file. + This only works for individual channels at the moment, as wavedump 1 saves each channel + as a separate file. + + For particularly large waveforms/number of events. You can 'chunk' the data such that + each dataset holds `counts` events. + + # Makeup of the header (header[n]) where n is: + # 0 - event size (ns in our case, with extra 24 samples) + # 1 - board ID + # 2 - pattern (not sure exactly what this means) + # 3 - board channel + # 4 - event counter + # 5 - Time-tag for the trigger + # Each of which is a signed 4byte integer + + + Parameters + ---------- + + file_path (str) : Path to binary file + save_path (str) : Path to saved file + sample_size (int) : Size of each sample in an event (2 ns in the case of V1730B digitiser) + overwrite (bool) : Boolean for overwriting pre-existing files + counts (int) : The number of events per chunks. -1 implies no chunking of data. + + + Returns + ------- + None + ''' + + + # lets build it here first and break it up later + # destroy the group within the file if you're overwriting + save_path = check_save_path(save_path, overwrite) + print(save_path) + + + # open file for reading + with open(file_path, 'rb') as file: + + # open writer object + with writer(save_path, 'RAW', overwrite) as write: + + for i, (waveform, samples, timestamp) in enumerate(process_event_lazy_WD1(file, sample_size)): + + if (i % print_mod == 0) and (print_mod != -1): + print(f"Event {i}") + + # enforce stucture upon data + e_dtype = types.event_info_type + wf_dtype = types.rwf_type_WD1(samples) + + event_info = np.array((i, timestamp, samples, sample_size, 1), dtype = e_dtype) + waveforms = np.array((i, 0, waveform), dtype = wf_dtype) + + # first run-through, collect the header information to extract table size + if i == 0: + file_size = os.path.getsize(file_path) + waveform_size = (samples * 2) + (4*6) + num_of_events = int(file_size / waveform_size) + + # add data to df lazily + write('event_info', event_info, (True, num_of_events, i)) + write('rwf', waveforms, (True, num_of_events, i)) + + def process_bin_WD2(file_path : str, save_path : str, overwrite : Optional[bool] = False, counts : Optional[int] = -1): ''' - Takes a binary file and outputs the containing waveform information in a h5 file. + WAVEDUMP 2: Takes a binary file and outputs the containing waveform information in a h5 file. For particularly large waveforms/number of events. You can 'chunk' the data such that each dataset holds `counts` events. diff --git a/packs/tests/data/configs/process_WD1_1channel.conf b/packs/tests/data/configs/process_WD1_1channel.conf new file mode 100644 index 0000000..3bef863 --- /dev/null +++ b/packs/tests/data/configs/process_WD1_1channel.conf @@ -0,0 +1,11 @@ +[required] +process = 'decode' +wavedump_edition = 1 +file_path = '/home/e78368jw/Documents/MULE/packs/tests/data/one_channel_WD1.dat' +save_path = '/home/e78368jw/Documents/MULE/packs/tests/data/one_channel_WD1_tmp.h5' +sample_size = 2 + +[optional] +overwrite = True +print_mod = 100 + diff --git a/packs/tests/data/configs/single_multi_chan.conf b/packs/tests/data/configs/single_multi_chan.conf index 857dfa7..7faa6df 100644 --- a/packs/tests/data/configs/single_multi_chan.conf +++ b/packs/tests/data/configs/single_multi_chan.conf @@ -1,8 +1,8 @@ [required] process = 'decode' wavedump_edition = 2 -file_path = 'single_multi_chan.bin' -save_path = 'single_multi_chan.h5' +file_path = 'packs/tests/data/single_multi_chan.bin' +save_path = 'packs/tests/data/single_multi_chan_tmp.h5' [optional] overwrite = True diff --git a/packs/tests/data/one_channel_WD1.dat b/packs/tests/data/one_channel_WD1.dat new file mode 100644 index 0000000..2e2fd2a Binary files /dev/null and b/packs/tests/data/one_channel_WD1.dat differ diff --git a/packs/tests/data/one_channel_WD1.h5 b/packs/tests/data/one_channel_WD1.h5 new file mode 100644 index 0000000..19f4b5a Binary files /dev/null and b/packs/tests/data/one_channel_WD1.h5 differ diff --git a/packs/tests/io_test.py b/packs/tests/io_test.py new file mode 100644 index 0000000..92b3151 --- /dev/null +++ b/packs/tests/io_test.py @@ -0,0 +1,48 @@ +import os +import sys + +import numpy as np +import pandas as pd + +from pytest import mark +from pytest import raises +from pytest import warns + +from packs.core.io import reader +from packs.core.io import writer + + +def test_reader_writer(tmp_path): + ''' + The simplest of tests, ensure the writer produces the expected output. + + This test should be expanded to differing modifications when they are included + as fixtures. As this isn't the case yet, there is no need. + ''' + + file = tmp_path / 'writer_output_tmp.h5' + + test_dtype = np.dtype([ + ('int', int), + ('float', float), + ('bool', bool), + ('bigfloat', float), + ]) + + test_dataset = [np.array((0, 1.0, False, 25000.323232), dtype = test_dtype), + np.array((1, 4.0, True, 23456789.321), dtype = test_dtype)] + + # create the writer object + with writer(file, 'test_group', overwrite = True) as scribe: + # write something to it + scribe('test_dataset', test_dataset[0]) + scribe('test_dataset', test_dataset[1]) + + # read it out, should pop an a StopIteration error + scholar = reader(file, 'test_group', 'test_dataset') + with raises(StopIteration): + assert next(scholar).tolist() == test_dataset[0].tolist() + assert next(scholar).tolist() == test_dataset[1].tolist() + next(scholar) + + diff --git a/packs/tests/processing_test.py b/packs/tests/processing_test.py index 4366160..a487707 100644 --- a/packs/tests/processing_test.py +++ b/packs/tests/processing_test.py @@ -2,6 +2,7 @@ import sys import numpy as np + import pandas as pd import subprocess @@ -11,7 +12,7 @@ from pytest import raises from pytest import warns -from packs.proc.processing_utils import read_defaults_WD2 +from packs.proc.processing_utils import process_bin_WD1, process_event_lazy_WD1, read_defaults_WD2 from packs.proc.processing_utils import process_header from packs.proc.processing_utils import read_binary from packs.proc.processing_utils import format_wfs @@ -22,8 +23,11 @@ from packs.types.types import rwf_type from packs.types.types import event_info_type +from packs.core.core_utils import MalformedHeaderError + from packs.core.io import load_rwf_info from packs.core.io import load_evt_info +from packs.core.io import reader from packs.types import types from hypothesis import given @@ -171,8 +175,9 @@ def test_runtime_error_when_too_many_save_files(): with raises(RuntimeError): check_save_path(relevant_dir + 'test_.txt', overwrite=False) + @mark.parametrize("config, inpt, output, comparison", [("process_WD2_1channel.conf", "one_channel_WD2.bin", "one_channel_tmp.h5", "one_channel_WD2.h5"), - ("process_WD2_3channel.conf", "three_channels_WD2.bin", "three_channels_tmp.h5", "three_channels_WD2.h5")]) + ("process_WD2_3channel.conf", "three_channels_WD2.bin", "three_channels_tmp.h5", "three_channels_WD2.h5")]) def test_decode_produces_expected_output(config, inpt, output, comparison): MULE_dir = str(os.environ['MULE_DIR']) @@ -203,3 +208,57 @@ def test_decode_produces_expected_output(config, inpt, output, comparison): assert load_evt_info(save_path).equals(load_evt_info(comparison_path)) assert load_rwf_info(save_path, samples).equals(load_rwf_info(comparison_path, samples)) +@mark.parametrize("config, inpt, output, comparison", [("process_WD1_1channel.conf", "one_channel_WD1.dat", "one_channel_WD1_tmp.h5", "one_channel_WD1.h5")]) +def test_WD1_decode_produces_expected_output(config, inpt, output, comparison): + ''' + This test will be merged with test_decode_produces_expected_output() + once WD2 processing has been updated to match lazy method of WD1 + ''' + + MULE_dir = str(os.environ['MULE_DIR']) + data_dir = "/packs/tests/data/" + + # ensure path is correct + file_path = MULE_dir + data_dir + inpt + save_path = MULE_dir + data_dir + output + comparison_path = MULE_dir + data_dir + comparison + config_path = MULE_dir + data_dir + "configs/" + config + + # rewrite paths to files + cnfg = configparser.ConfigParser() + cnfg.read(config_path) + cnfg.set('required', 'file_path', "'" + file_path + "'") # need to add comments around for config reasons + cnfg.set('required', 'save_path', "'" + save_path + "'") + + with open(config_path, 'w') as cfgfile: + cnfg.write(cfgfile) + + # run processing pack decode + run_pack = ['python3', MULE_dir + "/bin/mule", "proc", config_path] + subprocess.run(run_pack) + + # the event info can be read out like a normal h5, the RWF cannot due to how they're structured + assert pd.read_hdf(save_path, 'RAW/event_info').equals(pd.read_hdf(comparison_path, 'RAW/event_info')) + assert [x for x in reader(save_path, 'RAW', 'rwf')] == [x for x in reader(comparison_path, 'RAW', 'rwf')] + +def test_lazy_loading_malformed_data(): + ''' + Test that a file you pass through with no appropriate header is flagged if it's + not functioning correctly. + + ATM the check for this is: + - event number goes up +1 events + - number of samples stays the same across two events + - timestamp increases between events + These may not always hold, but will ensure the test works as expected + ''' + + MULE_dir = str(os.environ['MULE_DIR']) + data_path = MULE_dir + "/packs/tests/data/malformed_data.bin" + + with raises(MalformedHeaderError): + with open(data_path, 'rb') as file: + a = process_event_lazy_WD1(file, sample_size = 2) + next(a) + next(a) + diff --git a/packs/types/types.py b/packs/types/types.py index 4722041..c4b7823 100644 --- a/packs/types/types.py +++ b/packs/types/types.py @@ -2,16 +2,16 @@ event_info_type = np.dtype([ - ('event_number', np.uint32), - ('timestamp', np.uint64), - ('samples', np.uint32), - ('sampling_period', np.uint64), + ('event_number', np.uint32), + ('timestamp', np.uint64), + ('samples', np.uint32), + ('sampling_period', np.uint64), ('channels', np.int32), ]) def rwf_type(samples : int) -> np.dtype: """ - Generates the data-type for raw waveforms + Generates the data-type for raw waveforms Parameters ---------- @@ -26,12 +26,22 @@ def rwf_type(samples : int) -> np.dtype: """ return np.dtype([ - ('event_number', np.uint32), + ('event_number', np.uint32), ('channels', np.int32), ('rwf', np.float32, (samples,)) ]) +def rwf_type_WD1(samples : int) -> np.dtype: + ''' + WAVEDUMP 1: Generates the data-type for raw waveforms + ''' + + return np.dtype([('event_number', int), + ('channels', int), + ('rwf', np.uint16, (samples))]) + + def generate_wfdtype(channels, samples): ''' generates the dtype for collecting the binary data based on samples and number of @@ -39,21 +49,21 @@ def generate_wfdtype(channels, samples): ''' if channels >1: wdtype = np.dtype([ - ('event_number', np.uint32), - ('timestamp', np.uint64), - ('samples', np.uint32), - ('sampling_period', np.uint64), + ('event_number', np.uint32), + ('timestamp', np.uint64), + ('samples', np.uint32), + ('sampling_period', np.uint64), ('channels', np.int32), - ] + + ] + [(f'chan_{i+1}', np.float32, (samples,)) for i in range(0,channels)] ) else: wdtype = np.dtype([ - ('event_number', np.uint32), - ('timestamp', np.uint64), - ('samples', np.uint32), + ('event_number', np.uint32), + ('timestamp', np.uint64), + ('samples', np.uint32), ('sampling_period', np.uint64), ('chan_1', np.float32, (samples,)) ]) - return wdtype + return wdtype