Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions core/commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from enum import Enum, auto
from dataclasses import dataclass

class CommandType(Enum):
START = 0
STOP = 1
CONNECT = auto()
UPDATE = auto()
CH_DISPLAY = auto()

@dataclass
class Command:
type: CommandType
args: tuple = ()

215 changes: 70 additions & 145 deletions core/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,15 @@

from core.io import read_config_file
from core.logging import setup_logging
from core.commands import CommandType, Command
from core.worker import AcquisitionWorker
from core.tracker import Tracker
from felib.digitiser import Digitiser
from ui import oscilloscope

from threading import Thread, Event, Lock
from queue import Queue, Empty

class Controller:
def __init__(self,
dig_config: Optional[str] = None,
Expand All @@ -37,24 +43,30 @@ def __init__(self,
# Initialise logging and tracking
setup_logging()
self.tracker = Tracker()
logging.info("Controller initialising.")

# digitiser connection first
# Digitiser configuration
self.dig_config = dig_config
self.rec_config = rec_config

if dig_config is None:
logging.warning("No digitiser configuration file provided. Digitiser will not be connected.")
self.digitiser = None
else:
self.digitiser = self.connect_digitiser()
# Thread-safe communication channels
self.cmd_buffer = Queue(maxsize=10)
self.display_buffer = Queue(maxsize=1024)
self.stop_event = Event()

# check digitiser connection, if valid set isConnected to True
if self.digitiser is not None:
self.digitiser.isConnected = True
logging.info(f"Digitiser connected: {self.digitiser.URI}")
else:
logging.warning("Digitiser not connected.")
# Acquisition worker
self.worker = AcquisitionWorker(
cmd_buffer=self.cmd_buffer,
display_buffer=self.display_buffer,
stop_event=self.stop_event,
)

# Set the callback to the controller's data_handling method
self.worker.data_ready_callback = self.data_handling

# Start thread and log
self.worker.start()
logging.info("Acquisition worker thread started.")

# gui second
self.app = QApplication([])
Expand All @@ -64,50 +76,31 @@ def __init__(self,
self.fps_timer.timeout.connect(self.update_fps)
self.spf = 1 # seconds per frame

self.connect_digitiser()

# worker third
if self.digitiser is not None and self.digitiser.isConnected:
self.initialise_worker()

def initialise_worker(self):
def data_handling(self):
'''
Initialise the worker thread.
This in turn should begin the data collection (I think?)
Visualise data.
'''
while True:
try:
# non-blocking read from display queue
data = self.display_buffer.get_nowait()
except Empty:
break

# create thread to manage data output
self.worker_wait_condition = QWaitCondition()
self.acquisition_worker = AcquisitionWorker(self.worker_wait_condition, digitiser = self.digitiser)
self.acquisition_thread = QThread()
self.acquisition_worker.moveToThread(self.acquisition_thread)
self.acquisition_thread.started.connect(self.acquisition_worker.run)
self.acquisition_worker.data_ready.connect(self.data_handling)
self.acquisition_thread.start()


def data_handling(self):
# visualise (and at some point, collect in a file)
try:
wf_size, ADCs = self.acquisition_worker.data
except TypeError:
# type error occurs when recording in digitiser fails, so no error output here please!
return
except Exception as e:
logging.exception("Error in data_handling(): ")


try:
wf_size, ADCs = data

# save the data (PUT IT HERE)
# update visuals
self.main_window.screen.update_ch(np.arange(0, wf_size, dtype=wf_size.dtype), ADCs)

# ping the tracker (make this optional)
self.tracker.track(ADCs.nbytes)

# update visuals
self.main_window.screen.update_ch(np.arange(0, wf_size, dtype=wf_size.dtype), ADCs)

# ping the tracker (make this optional)
self.tracker.track(ADCs.nbytes)

# prep the next thread
if self.digitiser.isAcquiring:
self.worker_wait_condition.notify_one()
except Exception as e:
logging.exception(f"Error updating display: {e}")


def update_fps(self):
Expand All @@ -126,112 +119,44 @@ def connect_digitiser(self):
Connect to the digitiser using the provided configuration file.
This is a placeholder function and should be replaced with actual
digitiser connection logic.

Need to allow for changing config files after initial application launch.
'''

# Load in configs
dig_dict = read_config_file(self.dig_config)
rec_dict = read_config_file(self.rec_config)

if dig_dict is None:
logging.error("Digitiser configuration file not found or invalid.")
#raise ValueError("Digitiser configuration file not found or invalid.")
else:
digitiser = Digitiser(dig_dict)
digitiser.connect()
# Only add to the main window if it exists
if hasattr(self, 'main_window'):
self.main_window.control_panel.acquisition.update()

# once connected, configure recording setup
if rec_dict is None:
logging.warning("No recording configuration file provided.")
else:
if (digitiser is not None) and digitiser.isConnected:
digitiser.configure(dig_dict, rec_dict)
return digitiser

# Load in new configs
# self.dig_dict = some other dig_config
# self.rec_dict = some other rec_config

self.cmd_buffer.put(Command(CommandType.CONNECT, (self.dig_config, self.rec_config)))

# Only add to the main window if it exists
if hasattr(self, 'main_window'):
self.main_window.control_panel.acquisition.update()


def start_acquisition(self):
'''
Start the acquisition in multiple steps:
- Start the digitiser acquisition based on whatever trigger
settings are applied,
- Initialise the data reader,
- Initialise the output visuals.
Start digitiser acquisition.
'''
try:
self.digitiser.start_acquisition()
self.worker_wait_condition.wakeAll()
except Exception as e:
logging.exception('Failed to start acquisition.')
#self.digitiser.start_acquisition()
#self.trigger_and_record()
logging.info("Starting acquisition.")
self.cmd_buffer.put(Command(CommandType.START))

def stop_acquisition(self):
'''
Simple stopping of acquisition, this will end the AcquisitionWorkers loop and terminate
Stop digitiser acquisition.
'''
self.digitiser.isAcquiring = False


class AcquisitionWorker(QObject):
logging.info("Stopping acquisition.")
self.cmd_buffer.put(Command(CommandType.STOP))

data_ready = Signal()

def __init__(self, wait_condition, digitiser, parent=None):
super().__init__(parent=parent)
self.wait_condition = wait_condition
self.digitiser = digitiser
self.mutex = QMutex()
# ensure on initial startup that you're not acquiring.
self.digitiser.isAcquiring = False


def run(self):



while True:
self.mutex.lock()
if not self.digitiser.isAcquiring:
self.wait_condition.wait(self.mutex)
self.mutex.unlock()


self.data = self.digitiser.acquire()
self.data_ready.emit()

self.stop()

def stop(self):
self.digitiser.stop_acquisition()
self.wait_condition.wakeAll()


class Tracker:
'''
Tracking class that keeps track of:
- number of collected events
- speed at which data is being collected
'''

def __init__(self):
self.start_time = time.perf_counter()
self.bytes_ps = 0
self.events_ps = 0
self.last_time = self.start_time

def track(self, nbytes: int = 0):
def shutdown(self):
'''
Tracker outputting the number of events that arrive per second
Carefully shut down acquisition and worker thread.
'''
self.events_ps += 1
self.bytes_ps += nbytes

t_check = time.perf_counter()
if t_check - self.last_time >= 1.0:
MB = self.bytes_ps / 1000000
logging.info(f'|| {self.events_ps} events/sec || {MB:.2f} MB/sec ||')
self.last_time = t_check
self.bytes_ps = 0
self.events_ps = 0
logging.info("Shutting down controller.")
self.cmd_buffer.put(Command(CommandType.EXIT))
self.stop_event.set()
self.worker.join(timeout=2)
if self.worker.is_alive():
logging.warning("AcquisitionWorker did not stop cleanly.")
else:
logging.info("Controller shutdown complete.")
33 changes: 33 additions & 0 deletions core/tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import logging
import time
from threading import Lock

class Tracker:
'''
Tracking class that keeps track of:
- number of collected events
- speed at which data is being collected
'''

def __init__(self):
self.start_time = time.perf_counter()
self.bytes_ps = 0
self.events_ps = 0
self.last_time = self.start_time
self.lock = Lock()

def track(self, nbytes: int = 0):
'''
Tracker outputting the number of events that arrive per second
'''
with self.lock:
self.events_ps += 1
self.bytes_ps += nbytes

t_check = time.perf_counter()
if t_check - self.last_time >= 1.0:
MB = self.bytes_ps / 1000000
logging.info(f'|| {self.events_ps} events/sec || {MB:.2f} MB/sec ||')
self.last_time = t_check
self.bytes_ps = 0
self.events_ps = 0
Loading
Loading