diff --git a/pyftdi/spi.py b/pyftdi/spi.py index a24d828e..0e30f8d3 100644 --- a/pyftdi/spi.py +++ b/pyftdi/spi.py @@ -17,7 +17,7 @@ from logging import getLogger from struct import calcsize as scalc, pack as spack, unpack as sunpack from threading import Lock -from typing import Any, Iterable, Mapping, Optional, Set, Union +from typing import Any, Iterable, List, Mapping, Optional, Set, Union from usb.core import Device as UsbDevice from .ftdi import Ftdi, FtdiError @@ -160,7 +160,10 @@ def set_mode(self, mode: int, cs_hold: Optional[int] = None) -> None: self._cpha = bool(mode & 0x1) cs_clock = 0xFF & ~((int(not self._cpol) and SpiController.SCK_BIT) | SpiController.DO_BIT) - cs_select = 0xFF & ~((SpiController.CS_BIT << self._cs) | + cs_bit_sel = ~((self._controller._cs_idle + ^ (SpiController.CS_BIT << self._cs)) + & self._controller._cs_bits) + cs_select = 0xFF & ~(cs_bit_sel | (int(not self._cpol) and SpiController.SCK_BIT) | SpiController.DO_BIT) self._cs_prolog = bytes([cs_clock, cs_select]) @@ -217,6 +220,14 @@ def cs(self) -> int: """ return self._cs + @property + def cs_act_hi(self) -> bool: + """Is the CS line active high? + + :return: the CS polarity configuration + """ + return self._cs in self._controller.cs_act_hi + @property def mode(self) -> int: """Return the current SPI mode. @@ -355,6 +366,8 @@ def __init__(self, cs_count: int = 1, turbo: bool = True): self._frequency = 0.0 self._clock_phase = False self._cs_bits = 0 + self._cs_idle = 0 + self._cs_act_hi = 0 self._spi_ports = [] self._spi_dir = 0 self._spi_mask = self.SPI_BITS @@ -384,6 +397,8 @@ def configure(self, url: Union[str, UsbDevice], frequency. * ``cs_count`` count of chip select signals dedicated to select SPI slave devices, starting from A*BUS3 pin + * ``cs_act_hi`` an iterable specifying which SPI CS lines are active + high. Example: ``cs_act_hi=(0, 3)``. * ``turbo`` whether to enable or disable turbo mode * ``debug`` to increase log verbosity, using MPSSE tracer """ @@ -396,6 +411,9 @@ def configure(self, url: Union[str, UsbDevice], if not 1 <= self._cs_count <= 5: raise ValueError('Unsupported CS line count: %d' % self._cs_count) + if 'cs_act_hi' in kwargs: + self._parse_cs_active_high_arg(kwargs['cs_act_hi']) + del kwargs['cs_act_hi'] if 'turbo' in kwargs: self._turbo = bool(kwargs['turbo']) del kwargs['turbo'] @@ -421,6 +439,8 @@ def configure(self, url: Union[str, UsbDevice], raise SpiIOError('Already configured') self._cs_bits = (((SpiController.CS_BIT << self._cs_count) - 1) & ~(SpiController.CS_BIT - 1)) + self._cs_act_hi &= self._cs_bits + self._cs_idle = (~self._cs_act_hi) & self._cs_bits self._spi_ports = [None] * self._cs_count self._spi_dir = (self._cs_bits | SpiController.DO_BIT | @@ -431,7 +451,7 @@ def configure(self, url: Union[str, UsbDevice], # delay any truncation till the device is actually open self._set_gpio_direction(16, (~self._spi_mask) & 0xFFFF, io_dir) kwargs['direction'] = self._spi_dir | self._gpio_dir - kwargs['initial'] = self._cs_bits | (io_out & self._gpio_mask) + kwargs['initial'] = self._cs_idle | (io_out & self._gpio_mask) if not isinstance(url, str): self._frequency = self._ftdi.open_mpsse_from_device( url, interface=interface, **kwargs) @@ -558,6 +578,20 @@ def active_channels(self) -> Set[int]: """ return {port[0] for port in enumerate(self._spi_ports) if port[1]} + @property + def cs_act_hi(self) -> List[int]: + """Provide the active high CS lines. + + :return: List of channel numbers that are active high + """ + result = [] + mask = 0x08 + for i in range(self._cs_count): + if self._cs_act_hi & mask: + result.append(i) + mask <<= 1 + return result + @property def gpio_pins(self): """Report the configured GPIOs as a bitfield. @@ -711,6 +745,18 @@ def _set_gpio_direction(self, width: int, pins: int, self._gpio_dir |= (pins & direction) self._gpio_mask = gpio_mask & pins + def _parse_cs_active_high_arg(self, + cs_lines_hi: Iterable[int]) -> None: + bad_args = [] + for cs_line in cs_lines_hi: + if not 0 <= cs_line < self._cs_count: + bad_args.append(cs_line) + mask = 1 << (cs_line + 3) + self._cs_act_hi |= mask + if bad_args: + raise SpiIOError('Invalid active high CS lines: ' + + ', '.join([str(i) for i in bad_args])) + def _read_raw(self, read_high: bool) -> int: if not self._ftdi.is_connected: raise SpiIOError("FTDI controller not initialized") @@ -800,7 +846,7 @@ def _exchange_half_duplex(self, frequency: float, ctrl |= self._gpio_low epilog.extend((Ftdi.SET_BITS_LOW, ctrl, direction)) # Restore idle state - cs_high = [Ftdi.SET_BITS_LOW, self._cs_bits | self._gpio_low, + cs_high = [Ftdi.SET_BITS_LOW, self._cs_idle | self._gpio_low, direction] if not self._turbo: cs_high.append(Ftdi.SEND_IMMEDIATE) @@ -907,7 +953,7 @@ def _exchange_full_duplex(self, frequency: float, ctrl |= self._gpio_low epilog.extend((Ftdi.SET_BITS_LOW, ctrl, direction)) # Restore idle state - cs_high = [Ftdi.SET_BITS_LOW, self._cs_bits | self._gpio_low, + cs_high = [Ftdi.SET_BITS_LOW, self._cs_idle | self._gpio_low, direction] if not self._turbo: cs_high.append(Ftdi.SEND_IMMEDIATE) diff --git a/pyftdi/tests/spi.py b/pyftdi/tests/spi.py index 1753c872..a5d12ab2 100755 --- a/pyftdi/tests/spi.py +++ b/pyftdi/tests/spi.py @@ -361,6 +361,34 @@ def test_cs_default_pulse_rev_clock(self): for _ in range(5): self._port.force_select() +class SpiCsActiveHighTestCase(unittest.TestCase): + """Basic test for checking CS active high function + + It requires a scope or a digital analyzer to validate the signal + waveforms. + """ + + @classmethod + def setUpClass(cls): + cls.url = environ.get('FTDI_DEVICE', 'ftdi:///1') + cls.debug = to_bool(environ.get('FTDI_DEBUG', 'off')) + + def setUp(self): + self._spi = SpiController(cs_count=1) + self._spi.configure(self.url, debug=self.debug, cs_count=2, + cs_act_hi=[1]) + self._port0 = self._spi.get_port(0, freq=1E6, mode=0) + self._port1 = self._spi.get_port(1, freq=1E6, mode=0) + + def tearDown(self): + """Close the SPI connection""" + self._spi.terminate() + + def test_cs_config(self): + self.assertEqual(self._spi.cs_act_hi, [1]) + self.assertFalse(self._port0.cs_act_hi) + self.assertTrue(self._port1.cs_act_hi) + def suite(): suite_ = unittest.TestSuite() @@ -368,6 +396,7 @@ def suite(): # suite_.addTest(unittest.makeSuite(SpiGpioTestCase, 'test')) suite_.addTest(unittest.makeSuite(SpiUnalignedTestCase, 'test')) suite_.addTest(unittest.makeSuite(SpiCsForceTestCase, 'test')) + # suite_.addTest(unittest.makeSuite(SpiCsActiveHighTestCase, 'test')) return suite_