Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 51 additions & 5 deletions pyftdi/spi.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from logging import getLogger
from struct import calcsize as scalc, pack as spack, unpack as sunpack
from threading import Lock
from typing import Any, Iterable, Mapping, Optional, Set, Union
from typing import Any, Iterable, List, Mapping, Optional, Set, Union
from usb.core import Device as UsbDevice
from .ftdi import Ftdi, FtdiError

Expand Down Expand Up @@ -160,7 +160,10 @@ def set_mode(self, mode: int, cs_hold: Optional[int] = None) -> None:
self._cpha = bool(mode & 0x1)
cs_clock = 0xFF & ~((int(not self._cpol) and SpiController.SCK_BIT) |
SpiController.DO_BIT)
cs_select = 0xFF & ~((SpiController.CS_BIT << self._cs) |
cs_bit_sel = ~((self._controller._cs_idle
^ (SpiController.CS_BIT << self._cs))
& self._controller._cs_bits)
cs_select = 0xFF & ~(cs_bit_sel |
(int(not self._cpol) and SpiController.SCK_BIT) |
SpiController.DO_BIT)
self._cs_prolog = bytes([cs_clock, cs_select])
Expand Down Expand Up @@ -217,6 +220,14 @@ def cs(self) -> int:
"""
return self._cs

@property
def cs_act_hi(self) -> bool:
"""Is the CS line active high?

:return: the CS polarity configuration
"""
return self._cs in self._controller.cs_act_hi

@property
def mode(self) -> int:
"""Return the current SPI mode.
Expand Down Expand Up @@ -355,6 +366,8 @@ def __init__(self, cs_count: int = 1, turbo: bool = True):
self._frequency = 0.0
self._clock_phase = False
self._cs_bits = 0
self._cs_idle = 0
self._cs_act_hi = 0
self._spi_ports = []
self._spi_dir = 0
self._spi_mask = self.SPI_BITS
Expand Down Expand Up @@ -384,6 +397,8 @@ def configure(self, url: Union[str, UsbDevice],
frequency.
* ``cs_count`` count of chip select signals dedicated to select
SPI slave devices, starting from A*BUS3 pin
* ``cs_act_hi`` an iterable specifying which SPI CS lines are active
high. Example: ``cs_act_hi=(0, 3)``.
* ``turbo`` whether to enable or disable turbo mode
* ``debug`` to increase log verbosity, using MPSSE tracer
"""
Expand All @@ -396,6 +411,9 @@ def configure(self, url: Union[str, UsbDevice],
if not 1 <= self._cs_count <= 5:
raise ValueError('Unsupported CS line count: %d' %
self._cs_count)
if 'cs_act_hi' in kwargs:
self._parse_cs_active_high_arg(kwargs['cs_act_hi'])
del kwargs['cs_act_hi']
if 'turbo' in kwargs:
self._turbo = bool(kwargs['turbo'])
del kwargs['turbo']
Expand All @@ -421,6 +439,8 @@ def configure(self, url: Union[str, UsbDevice],
raise SpiIOError('Already configured')
self._cs_bits = (((SpiController.CS_BIT << self._cs_count) - 1) &
~(SpiController.CS_BIT - 1))
self._cs_act_hi &= self._cs_bits
self._cs_idle = (~self._cs_act_hi) & self._cs_bits
self._spi_ports = [None] * self._cs_count
self._spi_dir = (self._cs_bits |
SpiController.DO_BIT |
Expand All @@ -431,7 +451,7 @@ def configure(self, url: Union[str, UsbDevice],
# delay any truncation till the device is actually open
self._set_gpio_direction(16, (~self._spi_mask) & 0xFFFF, io_dir)
kwargs['direction'] = self._spi_dir | self._gpio_dir
kwargs['initial'] = self._cs_bits | (io_out & self._gpio_mask)
kwargs['initial'] = self._cs_idle | (io_out & self._gpio_mask)
if not isinstance(url, str):
self._frequency = self._ftdi.open_mpsse_from_device(
url, interface=interface, **kwargs)
Expand Down Expand Up @@ -558,6 +578,20 @@ def active_channels(self) -> Set[int]:
"""
return {port[0] for port in enumerate(self._spi_ports) if port[1]}

@property
def cs_act_hi(self) -> List[int]:
"""Provide the active high CS lines.

:return: List of channel numbers that are active high
"""
result = []
mask = 0x08
for i in range(self._cs_count):
if self._cs_act_hi & mask:
result.append(i)
mask <<= 1
return result

@property
def gpio_pins(self):
"""Report the configured GPIOs as a bitfield.
Expand Down Expand Up @@ -711,6 +745,18 @@ def _set_gpio_direction(self, width: int, pins: int,
self._gpio_dir |= (pins & direction)
self._gpio_mask = gpio_mask & pins

def _parse_cs_active_high_arg(self,
cs_lines_hi: Iterable[int]) -> None:
bad_args = []
for cs_line in cs_lines_hi:
if not 0 <= cs_line < self._cs_count:
bad_args.append(cs_line)
mask = 1 << (cs_line + 3)
self._cs_act_hi |= mask
if bad_args:
raise SpiIOError('Invalid active high CS lines: '
+ ', '.join([str(i) for i in bad_args]))

def _read_raw(self, read_high: bool) -> int:
if not self._ftdi.is_connected:
raise SpiIOError("FTDI controller not initialized")
Expand Down Expand Up @@ -800,7 +846,7 @@ def _exchange_half_duplex(self, frequency: float,
ctrl |= self._gpio_low
epilog.extend((Ftdi.SET_BITS_LOW, ctrl, direction))
# Restore idle state
cs_high = [Ftdi.SET_BITS_LOW, self._cs_bits | self._gpio_low,
cs_high = [Ftdi.SET_BITS_LOW, self._cs_idle | self._gpio_low,
direction]
if not self._turbo:
cs_high.append(Ftdi.SEND_IMMEDIATE)
Expand Down Expand Up @@ -907,7 +953,7 @@ def _exchange_full_duplex(self, frequency: float,
ctrl |= self._gpio_low
epilog.extend((Ftdi.SET_BITS_LOW, ctrl, direction))
# Restore idle state
cs_high = [Ftdi.SET_BITS_LOW, self._cs_bits | self._gpio_low,
cs_high = [Ftdi.SET_BITS_LOW, self._cs_idle | self._gpio_low,
direction]
if not self._turbo:
cs_high.append(Ftdi.SEND_IMMEDIATE)
Expand Down
29 changes: 29 additions & 0 deletions pyftdi/tests/spi.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,13 +361,42 @@ def test_cs_default_pulse_rev_clock(self):
for _ in range(5):
self._port.force_select()

class SpiCsActiveHighTestCase(unittest.TestCase):
"""Basic test for checking CS active high function

It requires a scope or a digital analyzer to validate the signal
waveforms.
"""

@classmethod
def setUpClass(cls):
cls.url = environ.get('FTDI_DEVICE', 'ftdi:///1')
cls.debug = to_bool(environ.get('FTDI_DEBUG', 'off'))

def setUp(self):
self._spi = SpiController(cs_count=1)
self._spi.configure(self.url, debug=self.debug, cs_count=2,
cs_act_hi=[1])
self._port0 = self._spi.get_port(0, freq=1E6, mode=0)
self._port1 = self._spi.get_port(1, freq=1E6, mode=0)

def tearDown(self):
"""Close the SPI connection"""
self._spi.terminate()

def test_cs_config(self):
self.assertEqual(self._spi.cs_act_hi, [1])
self.assertFalse(self._port0.cs_act_hi)
self.assertTrue(self._port1.cs_act_hi)


def suite():
suite_ = unittest.TestSuite()
# suite_.addTest(unittest.makeSuite(SpiTestCase, 'test'))
# suite_.addTest(unittest.makeSuite(SpiGpioTestCase, 'test'))
suite_.addTest(unittest.makeSuite(SpiUnalignedTestCase, 'test'))
suite_.addTest(unittest.makeSuite(SpiCsForceTestCase, 'test'))
# suite_.addTest(unittest.makeSuite(SpiCsActiveHighTestCase, 'test'))
return suite_


Expand Down