diff --git a/systole/recording.py b/systole/recording.py index df9b8d36..43a04966 100644 --- a/systole/recording.py +++ b/systole/recording.py @@ -748,3 +748,172 @@ def read(self, duration): def close(self): """Close TCPIP connections""" self.con.close() + + + + + +class Nonin3231USB: + """Recording Nonin 3231 USB HR signal through USB connection + + Parameters + ---------- + + serial : pySerial object + The `serial` instance interfacing with the USB port. + + + Examples + -------- + First, you will need to define a :py:func:`serial` instance, indexing the + USB port where the Nonin 3231 Pulse Oximeter is plugged. + + >>> import serial + >>> ser = serial.Serial('COM4') + + This instance is then used to create an :py:func:`Nonin3231USB` instance + that will be used for the recording from a Nonin 3231 USB device. + + >>> from ecg.recording import Nonin3231USB + >>> exg = Nonin3231USB(serial).read(30) + + Use the :py:func:`read` method to record some signal and save it in the + `exg` dictionary. + + .. warning:: The signals received fom the host are appened to a list. This + process can require more time at each iteration as the signal length + increase in memory. You should alway make sure that this will not + interfer with other task and regularly save intermediate recording to + save resources. + + Notes + ----- + """ + + def __init__( + self, + serial, + add_channels: Optional[int] = 1, + ): + self.reset(serial, add_channels) + + def reset( + self, + serial, + add_channels: Optional[int] = 1, + ): + """Initialize/restart the recording instance. + + Parameters + ---------- + serial : pySerial object + The `serial` instance interfacing with the USB port. + + Returns + ------- + Nonin3231USB instance. + """ + self.serial = serial + + # Initialize recording with empty lists + self.recording: List[float] = [] + self.bpm: List[float] = [] + self.SpO2: List[float] = [] + self.times: List[float] = [] + self.n_channels: Optional[int] = add_channels + if add_channels is not None: + self.channels: Optional[Dict[str, List]] = {} + for i in range(add_channels): + self.channels[f"Channel_{i}"] = [] + else: + self.channels = None + + # print('channels: '+str(self.channels)) + + return self + + def setup(self): + self.reset( + serial=self.serial, + ) + + return self + + def read(self, duration: float): + """Read PPG signal for some amount of time. + + Parameters + ---------- + duration : int or float + Length of the desired recording time. + """ + # init start ime + tstart = time.time() + + # read for length of duration + while time.time() - tstart < duration: + # assert one full line of data is there + if self.serial.inWaiting() >= 12: + # Store line of data + data = list(self.serial.readline()) + # assert full data line + if len(data) < 12: + continue + # get bpm reading + self.bpm.append(data[8]) + # give bpm to recording as well + self.recording.append(data[8]) + # get SpO2 reading (we don't use it) + self.SpO2.append(data[6]) + # get 'time' reading (seconds) + self.times.append(data[5]) + # Add 0 to the additional channels + if self.channels is not None: + for ch in self.channels: + self.channels[ch].append(0) + + return self + + + def readInWaiting(self, bool = False): + """Read in waiting Nonin data. + + Parameters + ---------- + stop : bool + Stop the recording when an error is detected. Default is *False*. + """ + + # nonin device reads out 12 bits per message + while self.serial.inWaiting() >= 12: + + # Store full message line + data = list(self.serial.readline()) + # assert that there is a full line of data + if len(data) < 12: + continue + # get bpm reading + self.bpm.append(data[8]) + # give bpm to recording as well + self.recording.append(data[8]) + # get SpO2 reading (we don't use it) + self.SpO2.append(data[6]) + # get 'time' reading (seconds) + self.times.append(data[5]) + # Add 0 to the additional channels for triggers + if self.channels is not None: + for ch in self.channels: + self.channels[ch].append(0) + + + + def save(self, fname: str): + """ + Not saving the data as of now, only results + """ + None + + + def close(self): + """Close serial connections""" + self.serial.close()