diff --git a/.gitignore b/.gitignore index 0a19790..3898e1a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,7 @@ +# CAEN stuff +CAENBoard_GENERICLog.txt +CAENParametersLog.txt + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] diff --git a/CAENBoard_GENERICLog.txt b/CAENBoard_GENERICLog.txt new file mode 100644 index 0000000..ed3a60d --- /dev/null +++ b/CAENBoard_GENERICLog.txt @@ -0,0 +1,2 @@ +[ 0.009][EE][CAENBoard_GENERIC.c::2526]: Open digitizer failed due to a comm error: -1 +[ 0.009][EE][CAENDPPLayer.c::118]: From c_newDigitizer: Error Calling CAENDigitizer API diff --git a/configs/debug.conf b/configs/debug.conf index 2700f72..61c47f9 100644 --- a/configs/debug.conf +++ b/configs/debug.conf @@ -6,4 +6,10 @@ con_type = 'A4818' link_num = 0 conet_node = 0 vme_base_address = 0 -dig_authority = 'caen.internal' \ No newline at end of file +dig_authority = 'caen.internal' + +[output_settings] + +file_name = 'output_file' # this will always be appended with a timestamp and .h5 +overwrite = True # overwrite the file if you want + diff --git a/configs/recording/five_ns_window.conf b/configs/recording/five_ns_window.conf index eecdef7..3786b0d 100644 --- a/configs/recording/five_ns_window.conf +++ b/configs/recording/five_ns_window.conf @@ -4,6 +4,7 @@ record_length = 4096 # ns pre_trigger = 512 # ns trigger_mode = 'SELFTRIG' # look into the differing methods, trigger on channel based on threshold is an option # SWTRIG, SELFTRIG, (not yet implemented) +software_timeout = 0 # hard software timeout between each digitiser poll (s) [channel_settings] @@ -16,5 +17,39 @@ ch1 = {'enabled' : False, 'self_trigger': False, 'threshold' : 0, 'polarity' : 'positive'} + +ch2 = {'enabled' : True, + 'self_trigger': True, + 'threshold' : 600, + 'polarity' : 'positive'} + +ch3 = {'enabled' : False, + 'self_trigger': False, + 'threshold' : 0, + 'polarity' : 'positive'} + +ch4 = {'enabled' : False, + 'self_trigger': False, + 'threshold' : 0, + 'polarity' : 'positive'} + +ch5 = {'enabled' : False, + 'self_trigger': False, + 'threshold' : 0, + 'polarity' : 'positive'} + +ch6 = {'enabled' : False, + 'self_trigger': False, + 'threshold' : 0, + 'polarity' : 'positive'} + +ch7 = {'enabled' : False, + 'self_trigger': False, + 'threshold' : 0, + 'polarity' : 'positive'} +[output_settings] +file_name = 'data/output_file' # this will always be appended with a timestamp and .h5 +overwrite = True # overwrite the file if you want +h5_flush_size = 20 # number of values added to h5 files per write diff --git a/core/controller.py b/core/controller.py index 99c60e7..3f971ce 100644 --- a/core/controller.py +++ b/core/controller.py @@ -1,6 +1,7 @@ import numpy as np import logging import time +from datetime import datetime #from caen_felib import lib, device, error from typing import Optional @@ -25,6 +26,7 @@ from core.logging import setup_logging from core.commands import CommandType, Command from core.worker import AcquisitionWorker +from core.writer import Writer from core.tracker import Tracker from felib.digitiser import Digitiser from ui import oscilloscope @@ -48,26 +50,51 @@ def __init__(self, # Digitiser configuration self.dig_config = dig_config self.rec_config = rec_config + self.dig_dict = read_config_file(self.dig_config) + self.rec_dict = read_config_file(self.rec_config) + + # initialise a universal event counter for sanity purposes + self.event_counter = 0 # Thread-safe communication channels self.cmd_buffer = Queue(maxsize=10) self.display_buffer = Queue(maxsize=1024) - self.stop_event = Event() + self.worker_stop_event = Event() + self.writer_stop_event = Event() + self.recording = False # Acquisition worker + self.sw_timeout = self.rec_dict['software_timeout'] self.worker = AcquisitionWorker( cmd_buffer=self.cmd_buffer, display_buffer=self.display_buffer, - stop_event=self.stop_event, + stop_event=self.worker_stop_event, + sw_timeout = self.sw_timeout ) # Set the callback to the controller's data_handling method self.worker.data_ready_callback = self.data_handling - # Start thread and log + # Start acquisition worker thread and log self.worker.start() logging.info("Acquisition worker thread started.") + # Multi channel writes to h5 + self.ch_mapping = self.get_ch_mapping() + self.num_ch = len(self.ch_mapping) + self.h5_flush_size = self.rec_dict['h5_flush_size'] + self.writer_buffers = [Queue(maxsize=1024) for _ in range(self.num_ch)] + self.writers = [Writer( + ch=curr_ch, + flush_size=self.h5_flush_size, + write_buffer=self.writer_buffers[i], + stop_event=self.writer_stop_event, + rec_config = read_config_file(self.rec_config), + dig_config = read_config_file(self.dig_config), + TIMESTAMP = datetime.now().strftime("%H:%M:%S") + ) + for curr_ch, i in self.ch_mapping.items()] + # gui second self.app = QApplication([]) self.main_window = oscilloscope.MainWindow(controller = self) @@ -78,6 +105,20 @@ def __init__(self, self.connect_digitiser() + def get_ch_mapping(self): + ''' + Extract what channels are being used map them: ch -> index + ''' + mapping = {} + i = 0 + for entry in self.rec_dict: + if 'ch' in entry: + if self.rec_dict[entry]['enabled']: + ch = int(entry[2:]) + mapping[ch] = i + i += 1 + + return mapping def data_handling(self): ''' @@ -91,7 +132,8 @@ def data_handling(self): break try: - wf_size, ADCs = data + # you must pass wf_size and ADCs through. + wf_size, ADCs, ch = data # update visuals self.main_window.screen.update_ch(np.arange(0, wf_size, dtype=wf_size.dtype), ADCs) @@ -99,6 +141,17 @@ def data_handling(self): # ping the tracker (make this optional) self.tracker.track(ADCs.nbytes) + # push data to writer buffer + if self.recording: + write_data = wf_size, ADCs, self.event_counter + + # multi channel writing + ch = int(ch) + i = int(self.ch_mapping[ch]) + self.writer_buffers[i].put(write_data) + + self.event_counter += 1 + except Exception as e: logging.exception(f"Error updating display: {e}") @@ -148,15 +201,59 @@ def stop_acquisition(self): logging.info("Stopping acquisition.") self.cmd_buffer.put(Command(CommandType.STOP)) + def start_recording(self): + ''' + Start recording data. + ''' + self.recording = True + for w in self.writers: + if not w.is_alive(): + w.start() + logging.info(f"Writer (channel {w.ch}) thread started.") + + logging.info("Starting recording.") + + def stop_recording(self): + ''' + Stop recording data. + ''' + self.recording = False + self.writer_stop_event.set() + + for w in self.writers: + w.join(timeout=2) + + logging.info("Stopping recording.") + def shutdown(self): ''' Carefully shut down acquisition and worker thread. ''' logging.info("Shutting down controller.") + + # Acquisition Worker thread self.cmd_buffer.put(Command(CommandType.EXIT)) - self.stop_event.set() + self.worker_stop_event.set() self.worker.join(timeout=2) + + # Writer threads + self.writer_stop_event.set() + for w in self.writers: + w.join(timeout=2) + + clean_shutdown = True + if self.worker.is_alive(): + clean_shutdown = False logging.warning("AcquisitionWorker did not stop cleanly.") - else: + + for w in self.writers: + if w.is_alive(): + clean_shutdown = False + logging.warning(f"Writer (channel {w.ch}) did not stop cleanly.") + + if clean_shutdown: logging.info("Controller shutdown complete.") + + else: + logging.info("Controller shutdown failed.") diff --git a/core/df_classes.py b/core/df_classes.py new file mode 100644 index 0000000..1725862 --- /dev/null +++ b/core/df_classes.py @@ -0,0 +1,31 @@ +import tables as tb +from typing import Type + + +class config_class(tb.IsDescription): + ''' + Holds dictionary (key, value) pairs + ''' + key = tb.StringCol(90) # 90 character string maximum, you've been warned! + value = tb.StringCol(90) + + + +def return_rwf_class(WD_version : str, shape : int) -> Type[tb.IsDescription]: + ''' + Based on MULE shapes, expect output to be formatted as such, for forwards compatibility. + ''' + if WD_version == 1: + class rwf_df(tb.IsDescription): + evt_no = tb.UInt32Col() + channel = tb.UInt32Col() + timestamp = tb.UInt64Col() + rwf = tb.UInt16Col(shape=(shape,)) + elif WD_version == 2: + class rwf_df(tb.IsDescription): + evt_no = tb.UInt32Col() + channel = tb.UInt32Col() + timestamp = tb.UInt64Col() + rwf = tb.Float32Col(shape = (shape,)) + + return rwf_df diff --git a/core/io.py b/core/io.py index bdbc14b..491d2cd 100644 --- a/core/io.py +++ b/core/io.py @@ -1,4 +1,6 @@ import pandas as pd +import tables as tb +import core.df_classes as df_class import ast import configparser @@ -42,4 +44,31 @@ def read_config_file(file_path : str) -> dict: # we can setup stricter rules at some other time arg_dict[key] = ast.literal_eval(config[section][key]) - return arg_dict \ No newline at end of file + return arg_dict + + +def create_config_table(h5file : tb.File, dictionary : dict, name : str, description = "config"): + + # create config node if it doesnt exist already + try: + group = h5file.get_node("/", "config") + except tb.NoSuchNodeError: + group = h5file.create_group("/", "config", "Config parameters") + + # create table + table = h5file.create_table(group, name, df_class.config_class, description) + # assign the rows by component + config_details = table.row + for key, values in dictionary.items(): + if type(values) is dict: + # single nest only! any more and you've made the config too complicated + for key_2, value_2 in values.items(): + #print(key_2) + config_details['key'] = f'{key}/{key_2}' + config_details['value'] = value_2 + config_details.append() + else: + config_details['key'] = key + config_details['value'] = values + config_details.append() + table.flush() diff --git a/core/worker.py b/core/worker.py index e12a0f2..2788c0f 100644 --- a/core/worker.py +++ b/core/worker.py @@ -14,7 +14,7 @@ class AcquisitionWorker(Thread): All commands and data flow through thread-safe mechanisms (queue, locks, events). ''' - def __init__(self, cmd_buffer: Queue, display_buffer: Queue, stop_event: Event): + def __init__(self, cmd_buffer: Queue, display_buffer: Queue, stop_event: Event, sw_timeout: float): super().__init__(daemon=True) self.digitiser = None self.stop_event = stop_event @@ -24,6 +24,7 @@ def __init__(self, cmd_buffer: Queue, display_buffer: Queue, stop_event: Event): self.data_ready_callback = None # set by Controller self.dig_config = None self.rec_config = None + self.sw_timeout = sw_timeout # set in config file (s) def enqueue_cmd(self, cmd_type: CommandType, *args): ''' @@ -142,7 +143,7 @@ def run(self): logging.exception(f"Acquisition error: {e}") # to avoid busy digitiser - add software timeout as member variable - time.sleep(1) + time.sleep(self.sw_timeout) except Exception as e: logging.exception(f"Fatal error in AcquisitionWorker: {e}") diff --git a/core/writer.py b/core/writer.py new file mode 100644 index 0000000..b6d5cc0 --- /dev/null +++ b/core/writer.py @@ -0,0 +1,114 @@ +from queue import Queue, Empty +from threading import Thread, Event, Lock +import logging +import tables as tb + +import core.df_classes as df_class +import core.io as io + + +class Writer(Thread): + ''' + Writes channel data to h5 file. + ''' + def __init__(self, + ch : int, + flush_size : int, + write_buffer : Queue, + stop_event : Event, + rec_config : dict, + dig_config : dict, + TIMESTAMP : str,): + ''' + TIMESTAMP should be provided to all channels identically before the + writer threads are initialised. + ''' + + super().__init__(daemon=True) + self.ch = ch + self.flush_size = flush_size + self.write_buffer = write_buffer + self.stop_event = stop_event + self.rec_config = rec_config + self.dig_config = dig_config + self.filename = f"some_name_ch_{self.ch}" + self.local_buffer = [] + self.wf_size = None + + if 'file_name' in self.rec_config: + file_path = self.rec_config['file_name'] + file_path = f'{file_path}_{self.ch}_{TIMESTAMP}.h5' + else: + file_path = f'{self.ch}_{TIMESTAMP}.h5' + # initialise the h5, one per channel, each handled on a separate thread + self.h5file = tb.open_file(f'{file_path}', mode='a') + # configs written + io.create_config_table(self.h5file, self.rec_config, 'rec_conf', 'recording config') + io.create_config_table(self.h5file, self.dig_config, 'dig_conf', 'digitiser config') + # raw waveform group constructed + self.rwf_group = self.h5file.create_group('/', f'ch_{ch}', 'raw waveform') + + + def write_h5(self): + ''' + Write local buffer to h5 file and then clear local buffer. + + assumption is that the local buffer contains tuples of: + (waveform_size, ADCs, event_no) + where ADCs is the actual raw waveform array + ''' + + for wf_size, rwf, evt in self.local_buffer: + # if we know the size of the waveforms already, don't create the class again. + if self.wf_size is None: + self.wf_size = wf_size + self.rwf_class = df_class.return_rwf_class(self.dig_config['dig_gen'], self.wf_size) + self.rwf_table = self.h5file.create_table(self.rwf_group, 'rwf', self.rwf_class, "raw waveforms") + self.rows = self.rwf_table.row + + self.rows['evt_no'] = evt + self.rows['rwf'] = rwf + self.rows.append() + + self.local_buffer.clear() + + # flush as fast as the buffer provides + self.rwf_table.flush() + pass + + + def run(self): + ''' + Writer hot loop. + ''' + logging.info(f"Writer thread started (ch {self.ch}).") + try: + while not self.stop_event.is_set(): + # First load data from shared buffer into local buffer + for _ in range(self.flush_size): + try: + self.local_buffer.append(self.write_buffer.get_nowait()) + except Empty: # exit for loop if shared buffer is empty + break + + # If no data was added to local buffer, don't write to h5 file + if len(self.local_buffer) == 0: + continue + + # Write all data in local buffer to h5 file + self.write_h5() + + except Exception as e: + logging.exception(f"Fatal error in Writer (ch {self.ch}): {e}") + + # When stop_event() is set, call cleanup() + self.cleanup() + logging.info(f"Writer thread exited cleanly (ch {self.ch}).") + + def cleanup(self): + ''' + Handles cleanup of writer thread and h5 file. + ''' + # close the h5 file + self.h5file.close() + pass diff --git a/felib/digitiser.py b/felib/digitiser.py index 97bd76b..8621172 100644 --- a/felib/digitiser.py +++ b/felib/digitiser.py @@ -232,6 +232,9 @@ def stop_acquisition(self): logging.exception("Stopping acsquisition failed:") def acquire(self): + ''' + Must return data with format (wf_size, ADCs, ch) + ''' match self.trigger_mode: case 'SWTRIG': return self.SW_record() @@ -252,7 +255,10 @@ def SW_record(self): try: self.endpoint.has_data(check_timeout) self.endpoint.read_data(read_timeout, self.data) # timeout first number in ms - return (self.data[7].value, self.data[3].value) + wf_size = self.data[7].value + ADCs = self.data[3].value + ch = self.data[0].value + return (wf_size, ADCs, ch) except error.Error as ex: #logging.exception("Error in readout:") if ex.code is error.ErrorCode.TIMEOUT: @@ -279,7 +285,10 @@ def SELFTRIG_record(self): try: self.endpoint.has_data(check_timeout) self.endpoint.read_data(read_timeout, self.data) - return (self.data[7].value, self.data[3].value) + wf_size = self.data[7].value + ADCs = self.data[3].value + ch = self.data[0].value + return (wf_size, ADCs, ch) except error.Error as ex: #logging.exception("Error in readout:") if ex.code is error.ErrorCode.TIMEOUT: diff --git a/ui/elements.py b/ui/elements.py index cc1e32c..fb17510 100644 --- a/ui/elements.py +++ b/ui/elements.py @@ -147,7 +147,7 @@ def update(self): else: self.start_stop.setStyleSheet("background-color: green; color: black") self.start_stop.clicked.connect(self.toggle_acquisition) - self.record.setStyleSheet("background-color: red; color: black") + self.record.setStyleSheet("background-color: green; color: black") self.record.clicked.connect(self.toggle_recording) @@ -177,3 +177,18 @@ def toggle_recording(self): if digitiser exists, must force digitiser.isAcquiring then enables digitiser.isRecording also ''' + if self.recording: + logging.info('Stopping recording...') + self.record.setStyleSheet("background-color: green; color: black") + self.recording = False + # stop the recording + self.controller.stop_recording() + else: + if self.acquiring: + logging.info('Starting recording...') + self.record.setStyleSheet("background-color: red; color: white") + self.recording = True + # start the recording + self.controller.start_recording() + else: + logging.info('Attempted to record but failed since not acquiring...')