diff --git a/core/commands.py b/core/commands.py new file mode 100644 index 0000000..aa70cc3 --- /dev/null +++ b/core/commands.py @@ -0,0 +1,15 @@ +from enum import Enum, auto +from dataclasses import dataclass + +class CommandType(Enum): + START = 0 + STOP = 1 + CONNECT = auto() + UPDATE = auto() + CH_DISPLAY = auto() + +@dataclass +class Command: + type: CommandType + args: tuple = () + diff --git a/core/controller.py b/core/controller.py index 238a63f..99c60e7 100644 --- a/core/controller.py +++ b/core/controller.py @@ -23,9 +23,15 @@ from core.io import read_config_file from core.logging import setup_logging +from core.commands import CommandType, Command +from core.worker import AcquisitionWorker +from core.tracker import Tracker from felib.digitiser import Digitiser from ui import oscilloscope +from threading import Thread, Event, Lock +from queue import Queue, Empty + class Controller: def __init__(self, dig_config: Optional[str] = None, @@ -37,24 +43,30 @@ def __init__(self, # Initialise logging and tracking setup_logging() self.tracker = Tracker() + logging.info("Controller initialising.") - # digitiser connection first + # Digitiser configuration self.dig_config = dig_config self.rec_config = rec_config - if dig_config is None: - logging.warning("No digitiser configuration file provided. Digitiser will not be connected.") - self.digitiser = None - else: - self.digitiser = self.connect_digitiser() + # Thread-safe communication channels + self.cmd_buffer = Queue(maxsize=10) + self.display_buffer = Queue(maxsize=1024) + self.stop_event = Event() - # check digitiser connection, if valid set isConnected to True - if self.digitiser is not None: - self.digitiser.isConnected = True - logging.info(f"Digitiser connected: {self.digitiser.URI}") - else: - logging.warning("Digitiser not connected.") + # Acquisition worker + self.worker = AcquisitionWorker( + cmd_buffer=self.cmd_buffer, + display_buffer=self.display_buffer, + stop_event=self.stop_event, + ) + # Set the callback to the controller's data_handling method + self.worker.data_ready_callback = self.data_handling + + # Start thread and log + self.worker.start() + logging.info("Acquisition worker thread started.") # gui second self.app = QApplication([]) @@ -64,50 +76,31 @@ def __init__(self, self.fps_timer.timeout.connect(self.update_fps) self.spf = 1 # seconds per frame + self.connect_digitiser() - # worker third - if self.digitiser is not None and self.digitiser.isConnected: - self.initialise_worker() - def initialise_worker(self): + def data_handling(self): ''' - Initialise the worker thread. - This in turn should begin the data collection (I think?) + Visualise data. ''' + while True: + try: + # non-blocking read from display queue + data = self.display_buffer.get_nowait() + except Empty: + break - # create thread to manage data output - self.worker_wait_condition = QWaitCondition() - self.acquisition_worker = AcquisitionWorker(self.worker_wait_condition, digitiser = self.digitiser) - self.acquisition_thread = QThread() - self.acquisition_worker.moveToThread(self.acquisition_thread) - self.acquisition_thread.started.connect(self.acquisition_worker.run) - self.acquisition_worker.data_ready.connect(self.data_handling) - self.acquisition_thread.start() - - - def data_handling(self): - # visualise (and at some point, collect in a file) - try: - wf_size, ADCs = self.acquisition_worker.data - except TypeError: - # type error occurs when recording in digitiser fails, so no error output here please! - return - except Exception as e: - logging.exception("Error in data_handling(): ") - - + try: + wf_size, ADCs = data - # save the data (PUT IT HERE) + # update visuals + self.main_window.screen.update_ch(np.arange(0, wf_size, dtype=wf_size.dtype), ADCs) + + # ping the tracker (make this optional) + self.tracker.track(ADCs.nbytes) - # update visuals - self.main_window.screen.update_ch(np.arange(0, wf_size, dtype=wf_size.dtype), ADCs) - - # ping the tracker (make this optional) - self.tracker.track(ADCs.nbytes) - - # prep the next thread - if self.digitiser.isAcquiring: - self.worker_wait_condition.notify_one() + except Exception as e: + logging.exception(f"Error updating display: {e}") def update_fps(self): @@ -126,112 +119,44 @@ def connect_digitiser(self): Connect to the digitiser using the provided configuration file. This is a placeholder function and should be replaced with actual digitiser connection logic. + + Need to allow for changing config files after initial application launch. ''' - # Load in configs - dig_dict = read_config_file(self.dig_config) - rec_dict = read_config_file(self.rec_config) - - if dig_dict is None: - logging.error("Digitiser configuration file not found or invalid.") - #raise ValueError("Digitiser configuration file not found or invalid.") - else: - digitiser = Digitiser(dig_dict) - digitiser.connect() - # Only add to the main window if it exists - if hasattr(self, 'main_window'): - self.main_window.control_panel.acquisition.update() - - # once connected, configure recording setup - if rec_dict is None: - logging.warning("No recording configuration file provided.") - else: - if (digitiser is not None) and digitiser.isConnected: - digitiser.configure(dig_dict, rec_dict) - return digitiser - + # Load in new configs + # self.dig_dict = some other dig_config + # self.rec_dict = some other rec_config + + self.cmd_buffer.put(Command(CommandType.CONNECT, (self.dig_config, self.rec_config))) + + # Only add to the main window if it exists + if hasattr(self, 'main_window'): + self.main_window.control_panel.acquisition.update() + def start_acquisition(self): ''' - Start the acquisition in multiple steps: - - Start the digitiser acquisition based on whatever trigger - settings are applied, - - Initialise the data reader, - - Initialise the output visuals. + Start digitiser acquisition. ''' - try: - self.digitiser.start_acquisition() - self.worker_wait_condition.wakeAll() - except Exception as e: - logging.exception('Failed to start acquisition.') - #self.digitiser.start_acquisition() - #self.trigger_and_record() + logging.info("Starting acquisition.") + self.cmd_buffer.put(Command(CommandType.START)) def stop_acquisition(self): ''' - Simple stopping of acquisition, this will end the AcquisitionWorkers loop and terminate + Stop digitiser acquisition. ''' - self.digitiser.isAcquiring = False - - -class AcquisitionWorker(QObject): + logging.info("Stopping acquisition.") + self.cmd_buffer.put(Command(CommandType.STOP)) - data_ready = Signal() - - def __init__(self, wait_condition, digitiser, parent=None): - super().__init__(parent=parent) - self.wait_condition = wait_condition - self.digitiser = digitiser - self.mutex = QMutex() - # ensure on initial startup that you're not acquiring. - self.digitiser.isAcquiring = False - - - def run(self): - - - - while True: - self.mutex.lock() - if not self.digitiser.isAcquiring: - self.wait_condition.wait(self.mutex) - self.mutex.unlock() - - - self.data = self.digitiser.acquire() - self.data_ready.emit() - - self.stop() - - def stop(self): - self.digitiser.stop_acquisition() - self.wait_condition.wakeAll() - - -class Tracker: - ''' - Tracking class that keeps track of: - - number of collected events - - speed at which data is being collected - ''' - - def __init__(self): - self.start_time = time.perf_counter() - self.bytes_ps = 0 - self.events_ps = 0 - self.last_time = self.start_time - - def track(self, nbytes: int = 0): + def shutdown(self): ''' - Tracker outputting the number of events that arrive per second + Carefully shut down acquisition and worker thread. ''' - self.events_ps += 1 - self.bytes_ps += nbytes - - t_check = time.perf_counter() - if t_check - self.last_time >= 1.0: - MB = self.bytes_ps / 1000000 - logging.info(f'|| {self.events_ps} events/sec || {MB:.2f} MB/sec ||') - self.last_time = t_check - self.bytes_ps = 0 - self.events_ps = 0 \ No newline at end of file + logging.info("Shutting down controller.") + self.cmd_buffer.put(Command(CommandType.EXIT)) + self.stop_event.set() + self.worker.join(timeout=2) + if self.worker.is_alive(): + logging.warning("AcquisitionWorker did not stop cleanly.") + else: + logging.info("Controller shutdown complete.") diff --git a/core/tracker.py b/core/tracker.py new file mode 100644 index 0000000..09be9ec --- /dev/null +++ b/core/tracker.py @@ -0,0 +1,33 @@ +import logging +import time +from threading import Lock + +class Tracker: + ''' + Tracking class that keeps track of: + - number of collected events + - speed at which data is being collected + ''' + + def __init__(self): + self.start_time = time.perf_counter() + self.bytes_ps = 0 + self.events_ps = 0 + self.last_time = self.start_time + self.lock = Lock() + + def track(self, nbytes: int = 0): + ''' + Tracker outputting the number of events that arrive per second + ''' + with self.lock: + self.events_ps += 1 + self.bytes_ps += nbytes + + t_check = time.perf_counter() + if t_check - self.last_time >= 1.0: + MB = self.bytes_ps / 1000000 + logging.info(f'|| {self.events_ps} events/sec || {MB:.2f} MB/sec ||') + self.last_time = t_check + self.bytes_ps = 0 + self.events_ps = 0 diff --git a/core/worker.py b/core/worker.py new file mode 100644 index 0000000..e12a0f2 --- /dev/null +++ b/core/worker.py @@ -0,0 +1,165 @@ +from queue import Queue, Empty +from threading import Thread, Event, Lock +import logging +import time +from core.commands import CommandType, Command +from felib.digitiser import Digitiser +from core.io import read_config_file + +class AcquisitionWorker(Thread): + ''' + Handles digitiser I/O in a background thread. + + This class is designed to be thread-safe and independent from Qt threading. + All commands and data flow through thread-safe mechanisms (queue, locks, events). + ''' + + def __init__(self, cmd_buffer: Queue, display_buffer: Queue, stop_event: Event): + super().__init__(daemon=True) + self.digitiser = None + self.stop_event = stop_event + self.cmd_buffer = cmd_buffer + self.display_buffer = display_buffer + self.data_buffer = Queue() + self.data_ready_callback = None # set by Controller + self.dig_config = None + self.rec_config = None + + def enqueue_cmd(self, cmd_type: CommandType, *args): + ''' + Global interface for Controller. + ''' + self.cmd_buffer.put(Command(cmd_type, args)) + + def handle_command(self, cmd: Command): + ''' + Handles commands sent to AcquisitionWorker. Currently supports the commands: + - CONNECT + - START + - STOP + - EXIT + ''' + logging.debug(f"Handling command: {cmd.type}") + args = cmd.args + try: + match cmd.type: + case CommandType.CONNECT: + self.connect_digitiser(*args) + case CommandType.START: + self.start_acquisition() + case CommandType.STOP: + self.cleanup() + case CommandType.EXIT: + self.stop_event.set() + case _: + logging.warning(f"Unknown command: {cmd.type}") + except Exception as e: + logging.exception(f"Command {cmd.type} failed: {e}") + + def start_acquisition(self): + ''' + Starts digitiser acquisition. First checks to see if there is a digitiser connected. + If not already connected, connect using the config files. Finally, tell the digitiser + to start acquisition. + ''' + if self.digitiser is None: + logging.info("No digitiser instance — reconnecting before start.") + if self.dig_config is None or self.rec_config is None: + logging.error("No stored configuration — cannot reconnect digitiser.") + return + self.connect_digitiser(self.dig_config, self.rec_config) + try: + self.digitiser.start_acquisition() + logging.info("Digitiser acquisition started successfully.") + except Exception as e: + logging.exception(f"Start acquisition failed: {e}") + + def connect_digitiser(self, dig_config, rec_config): + ''' + Connect to digitiser with given configs. + ''' + # cache configs + self.dig_config = dig_config + self.rec_config = rec_config + + # Load in configs + dig_dict = read_config_file(dig_config) + rec_dict = read_config_file(rec_config) + + if dig_dict is None: + logging.error("Digitiser configuration file not found or invalid.") + return + + self.digitiser = Digitiser(dig_dict) + self.digitiser.connect() + + # once connected, configure recording setup + if rec_dict is None: + logging.warning("No recording configuration file provided.") + else: + if (self.digitiser is not None) and self.digitiser.isConnected: + self.digitiser.configure(dig_dict, rec_dict) + + def run(self): + ''' + Data acquisition hot loop. Hot loop runs until stop_event is set either manually + or via the EXIT command. + ''' + logging.info("AcquisitionWorker thread started.") + try: + while not self.stop_event.is_set(): + # Handle commands + while True: + try: + cmd = self.cmd_buffer.get(timeout=0.01) + self.handle_command(cmd) + except Empty: # exit cmd loop if cmd buffer is empty + break + + # Acquire data if running + if self.digitiser and self.digitiser.isAcquiring: + try: + data = self.digitiser.acquire() + if data is None: + continue + + # Non-blocking put to visual buffer + if self.display_buffer.full(): + try: + self.display_buffer.get_nowait() # discard oldest + except Empty: + pass + + # Push to display buffer (etc.) + if not self.display_buffer.full(): + self.display_buffer.put_nowait(data) + + # Notify controller/UI + if self.data_ready_callback: + self.data_ready_callback() + + except Exception as e: + logging.exception(f"Acquisition error: {e}") + + # to avoid busy digitiser - add software timeout as member variable + time.sleep(1) + + except Exception as e: + logging.exception(f"Fatal error in AcquisitionWorker: {e}") + + # when stop_event() is set, call destructor of digitiser inside cleanup() + self.cleanup() + logging.info("AcquisitionWorker thread exited cleanly.") + + def cleanup(self): + ''' + Cleans up digitiser by calling stop_acquisition and its destructor. + ''' + if self.digitiser: + if self.digitiser.isAcquiring: + self.digitiser.stop_acquisition() + del self.digitiser + self.digitiser = None + logging.info("Digitiser fully cleaned up after STOP.") + + diff --git a/felib/digitiser.py b/felib/digitiser.py index dfed235..97bd76b 100644 --- a/felib/digitiser.py +++ b/felib/digitiser.py @@ -6,6 +6,7 @@ import numpy as np import logging from typing import Optional +import time from felib.dig1_utils import generate_digitiser_uri @@ -205,7 +206,7 @@ def start_acquisition(self): try: self.dig.cmd.ARMACQUISITION() except Exception as e: - logging.exception("Starting acquisition failed:") + logging.exception(f"Starting acquisition failed: {e}") # start recording function #self.trigger_and_record() @@ -230,7 +231,6 @@ def stop_acquisition(self): except Exception as e: logging.exception("Stopping acsquisition failed:") - def acquire(self): match self.trigger_mode: case 'SWTRIG': @@ -297,4 +297,4 @@ def __del__(self): self.stop_acquisition() if hasattr(self, 'dig') and self.dig is not None: logging.info("Closing digitiser connection.") - self.dig.close() \ No newline at end of file + self.dig.close() diff --git a/ui/elements.py b/ui/elements.py index 5020e5f..cc1e32c 100644 --- a/ui/elements.py +++ b/ui/elements.py @@ -112,6 +112,9 @@ def __init__(self, controller, parent=None): self.controller = controller + # local flags (rather than using digitiser flags <-- race condition) + self.acquiring = False + self.recording = False self.start_stop = QPushButton("Start") self.record = QPushButton("Record") @@ -123,37 +126,49 @@ def __init__(self, controller, parent=None): layout.addWidget(self.start_stop) layout.addWidget(self.record) # update button based on digitiser state + self.update() - def update(self): + def update(self): ''' Update the acquisition status based on the digitiser state. ''' - if self.controller.digitiser is None: + try: + # Safely disconnect previous button signal connections + self.start_stop.clicked.disconnect() + self.record.clicked.disconnect() + except TypeError: + pass # ignore error if no connections exist + + if self.controller is None: self.start_stop.setStyleSheet("background-color: grey; color: black") self.record.setStyleSheet("background-color: grey; color: black") else: - # start stop self.start_stop.setStyleSheet("background-color: green; color: black") self.start_stop.clicked.connect(self.toggle_acquisition) - # recording... self.record.setStyleSheet("background-color: red; color: black") self.record.clicked.connect(self.toggle_recording) - + def toggle_acquisition(self): - if self.controller.digitiser.isAcquiring: + ''' + Toggles aquisition by calling appropriate controller member function and updating + local acquiring flag. Calls controller member functions since this code runs on the + main thread (not the AcquisitionWorker thread). + ''' + if self.acquiring: logging.info('Stopping acquisition...') self.start_stop.setText("Start") self.start_stop.setStyleSheet("background-color: green; color: black") + self.acquiring = False # stop the acquisition - self.controller.digitiser.stop_acquisition() + self.controller.stop_acquisition() else: logging.info('Starting acquisition...') self.start_stop.setText("Stop") self.start_stop.setStyleSheet("background-color: red; color: white") - self.controller.digitiser.isAcquiring = True + self.acquiring = True # start the acquisition self.controller.start_acquisition() @@ -161,4 +176,4 @@ def toggle_recording(self): ''' if digitiser exists, must force digitiser.isAcquiring then enables digitiser.isRecording also - ''' \ No newline at end of file + '''