Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions packs/core/core_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,19 @@ def check_test(file):
return True
else:
return False


# THIS SHOULD BE MOVED ELSEWHERE
class MalformedHeaderError(Exception):
'''
Header created for when two headers don't match up consecutively.
Created initially for WD1 processing, but should be back-ported for WD2
'''

def __init__(self, header1, header2):
self.header1 = header1
self.header2 = header2

def __str__(self):
return f"MalformedHeaderError: Headers don't output expected result. Ensure the .dat file provided is formatted correctly.\nFirst Header {self.header1}\nSecond Header {self.header2}"

122 changes: 122 additions & 0 deletions packs/core/io.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
import os

import pandas as pd
import numpy as np

import h5py
import ast
import configparser

from contextlib import contextmanager

from typing import Optional
from typing import Generator
from typing import Union
from typing import Tuple

from packs.types import types

Expand Down Expand Up @@ -87,6 +96,10 @@ def read_config_file(file_path : str) -> dict:
# setup config parser
config = configparser.ConfigParser()

if not os.path.exists(file_path):
raise FileNotFoundError(2, 'No such config file', file_path)


# read in arguments, require the required ones
config.read(file_path)
arg_dict = {}
Expand All @@ -99,3 +112,112 @@ def read_config_file(file_path : str) -> dict:
arg_dict[key] = ast.literal_eval(config[section][key])

return arg_dict


@contextmanager
def writer(path : str,
group : str,
overwrite : Optional[bool] = True) -> Generator:
'''
Outer function for a lazy h5 writer that will iteratively write to a dataset, with the formatting:
FILE.h5 -> GROUP/DATASET
Includes overwriting functionality, which will overwrite **GROUPS** at will if needed.
Parameters
----------
path (str) : File path
group (str) : Group within the h5 file
overwrite(bool) : Boolean for overwriting previous dataset (OPTIONAL)

Returns
-------
write (func) : write function described in write()


Fixed size is for when you know the size of the output file, so you set the size
of the df beforehand, saving precious IO operation. The input then becomes a tuple
of (True, DF_SIZE, INDEX), otherwise its false.
'''


# open file if exists, create group or overwrite it
h5f = h5py.File(path, 'a')
try:
if overwrite:
if group in h5f:
del h5f[group]

gr = h5f.require_group(group)

def write(dataset : str,
data : np.ndarray,
fixed_size : Optional[Union[False, Tuple[True, int, int]]] = False) -> None:
'''
Writes ndarray to dataset within group defined in writer().
Fixed size used to speed up writing, if True will
create a dataset of a fixed size rather than
increasing the size iteratively.

Parameters
----------
dataset (str) : Dataset name to write to
data (ndarray) : Data to write*
fixed_size (Union[Bool, Tuple[Bool, int, int]])
: Method that's either enable or disabled.
False (disabled) -> Iteratively increases size of dataframe at runtime
True (enabled) -> Requires Tuple containing
(True, number of events, index to write to)
This method is best seen in action in `process_bin_WD1()`.
* Data should be in a numpy structured array format, as can be seen in WD1 and WD2 processing
'''
if fixed_size is False:
# create dataset if doesnt exist, if does make larger
if dataset in gr:
dset = gr[dataset]
dset.resize((dset.shape[0] + 1, *dset.shape[1:]))
dset[-1] = data
else:
max_shape = (None,) + data.shape
dset = gr.require_dataset(dataset, shape = (1,) + data.shape,
maxshape = max_shape, dtype = data.dtype,
chunks = True)
dset[0] = data
else:
index = fixed_size[2]
# dataset of fixed size
if dataset in gr:
dset = gr[dataset]
else:
dset = gr.require_dataset(dataset, shape = (fixed_size[1],) + data.shape,
maxshape = fixed_size[1], dtype = data.dtype,
chunks = True)
dset[index] = data

yield write

finally:
h5f.close()


def reader(path : str,
group : str,
dataset : str) -> Generator:
'''
A lazy h5 reader that will iteratively read from a dataset, with the formatting:

FILE.H5 -> GROUP/DATASET
Parameters
----------
path (str) : File path
group (str) : Group name within the h5 file
dataset (str) : Dataset name within the group
Returns
-------
row (generator) : Generator object that returns the next row from the dataset upon being called.
'''

with h5py.File(path, 'r') as h5f:
gr = h5f[group]
dset = gr[dataset]

for row in dset:
yield row
3 changes: 3 additions & 0 deletions packs/proc/proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from packs.core.io import read_config_file
from packs.proc.processing_utils import process_bin_WD2
from packs.proc.processing_utils import process_bin_WD1
from packs.core.core_utils import check_test

def proc(config_file):
Expand All @@ -22,6 +23,8 @@ def proc(config_file):
case 'decode':
if conf_dict['wavedump_edition'] == 2:
process_bin_WD2(**arg_dict)
elif conf_dict['wavedump_edition'] == 1:
process_bin_WD1(**arg_dict)
else:
raise RuntimeError(f"wavedump edition {conf_dict['wavedump_edition']} decoding isn't currently implemented.")
case default:
Expand Down
121 changes: 120 additions & 1 deletion packs/proc/processing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

# imports start from MULE/
from packs.core.core_utils import flatten
from packs.core.core_utils import MalformedHeaderError
from packs.core.io import writer
from packs.types import types

"""
Expand Down Expand Up @@ -369,13 +371,130 @@ def check_save_path(save_path : str,
return save_path


def process_event_lazy_WD1(file_object : BinaryIO,
sample_size : int):

'''
WAVEDUMP 1: Generator that outputs each event iteratively from an opened binary file
Parameters
----------
file_object (obj) : Opened file object
sample_size (int) : Time difference between each sample in waveform (2ns for V1730B digitiser)
Returns
-------
data (generator) : Generator object containing one event's worth of data
across each event
'''

# read first header
header = np.fromfile(file_object, dtype = 'i', count = 6)

# header to check against
sanity_header = header.copy()

# continue only if data exists
while len(header) > 0:

# alter header to match expected size
header[0] = header[0] - 24
event_size = header[0] // sample_size

# collect waveform, no of samples and timestamp
yield (np.fromfile(file_object, dtype = np.dtype('<H'), count = event_size), event_size, header[-1])

# collect next header
header = np.fromfile(file_object, dtype = 'i', count = 6)

# check if header has correct number of elements and correct information ONCE.
if sanity_header is not None:
if len(header) == 6:
if all([header[0] == sanity_header[0], # event size
header[4] == sanity_header[4] + 1, # event number +1
header[5] > sanity_header[5] # timestamp increases
]):
sanity_header = None
else:
raise MalformedHeaderError(sanity_header, header)
else:
raise MalformedHeaderError(sanity_header, header)
print("Processing Finished!")


def process_bin_WD1(file_path : str,
save_path : str,
sample_size : int,
overwrite : Optional[bool] = False,
print_mod : Optional[int] = -1):

'''
WAVEDUMP 1: Takes a binary file and outputs the containing information in a h5 file.
This only works for individual channels at the moment, as wavedump 1 saves each channel
as a separate file.
For particularly large waveforms/number of events. You can 'chunk' the data such that
each dataset holds `counts` events.
# Makeup of the header (header[n]) where n is:
# 0 - event size (ns in our case, with extra 24 samples)
# 1 - board ID
# 2 - pattern (not sure exactly what this means)
# 3 - board channel
# 4 - event counter
# 5 - Time-tag for the trigger
# Each of which is a signed 4byte integer
Parameters
----------
file_path (str) : Path to binary file
save_path (str) : Path to saved file
sample_size (int) : Size of each sample in an event (2 ns in the case of V1730B digitiser)
overwrite (bool) : Boolean for overwriting pre-existing files
counts (int) : The number of events per chunks. -1 implies no chunking of data.
Returns
-------
None
'''


# lets build it here first and break it up later
# destroy the group within the file if you're overwriting
save_path = check_save_path(save_path, overwrite)
print(save_path)


# open file for reading
with open(file_path, 'rb') as file:

# open writer object
with writer(save_path, 'RAW', overwrite) as write:

for i, (waveform, samples, timestamp) in enumerate(process_event_lazy_WD1(file, sample_size)):

if (i % print_mod == 0) and (print_mod != -1):
print(f"Event {i}")

# enforce stucture upon data
e_dtype = types.event_info_type
wf_dtype = types.rwf_type_WD1(samples)

event_info = np.array((i, timestamp, samples, sample_size, 1), dtype = e_dtype)
waveforms = np.array((i, 0, waveform), dtype = wf_dtype)

# first run-through, collect the header information to extract table size
if i == 0:
file_size = os.path.getsize(file_path)
waveform_size = (samples * 2) + (4*6)
num_of_events = int(file_size / waveform_size)

# add data to df lazily
write('event_info', event_info, (True, num_of_events, i))
write('rwf', waveforms, (True, num_of_events, i))


def process_bin_WD2(file_path : str,
save_path : str,
overwrite : Optional[bool] = False,
counts : Optional[int] = -1):

'''
Takes a binary file and outputs the containing waveform information in a h5 file.
WAVEDUMP 2: Takes a binary file and outputs the containing waveform information in a h5 file.

For particularly large waveforms/number of events. You can 'chunk' the data such that
each dataset holds `counts` events.
Expand Down
12 changes: 12 additions & 0 deletions packs/tests/data/configs/process_WD1.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[required]

process = 'decode'
wavedump_edition = 1
file_path = '/path/to/file.bin'
save_path = '/path/to/file.h5'

[optional]

overwrite = True
counts = -1
print_mod = 100
11 changes: 11 additions & 0 deletions packs/tests/data/configs/process_WD1_1channel.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[required]
process = 'decode'
wavedump_edition = 1
file_path = '/home/casper/Documents/MULE/packs/tests/data/one_channel_WD1.dat'
save_path = '/home/casper/Documents/MULE/packs/tests/data/one_channel_WD1_tmp.h5'
sample_size = 2

[optional]
overwrite = True
print_mod = 100

Binary file added packs/tests/data/one_channel_WD1.dat
Binary file not shown.
Binary file added packs/tests/data/one_channel_WD1.h5
Binary file not shown.
Loading