Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 44 additions & 23 deletions pyftdi/i2c.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,20 +310,22 @@ def read(self, with_output: bool = False) -> int:
"""
return self._controller.read_gpio(with_output)

def write(self, value: int) -> None:
def write(self, value: int, with_input: bool = True) -> None:
"""Write GPIO port.

:param value: the GPIO port pins as a bitfield
:with_input: do not error when writing to inputs
"""
return self._controller.write_gpio(value)
return self._controller.write_gpio(value, with_input)

def set_direction(self, pins: int, direction: int) -> None:
def set_direction(self, pins: int, direction: int,
immediate: bool = True) -> None:
"""Change the direction of the GPIO pins.

:param pins: which GPIO pins should be reconfigured
:param direction: direction bitfield (high level for output)
"""
self._controller.set_gpio_direction(pins, direction)
self._controller.set_gpio_direction(pins, direction, immediate)


I2CTimings = namedtuple('I2CTimings', 't_hd_sta t_su_sta t_su_sto t_buf')
Expand Down Expand Up @@ -379,6 +381,7 @@ def __init__(self):
self._gpio_dir = 0
self._gpio_low = 0
self._gpio_mask = 0
self._gpio_last = 0
self._i2c_mask = 0
self._wide_port = False
self._slaves = {}
Expand Down Expand Up @@ -485,11 +488,15 @@ def configure(self, url: Union[str, UsbDevice],
# until the device is open, there is no way to tell if it has a
# wide (16) or narrow port (8). Lower API can deal with any, so
# delay any truncation till the device is actually open
self._set_gpio_direction(16, io_out, io_dir)
gpio_mask = (1 << 16) - 1 # set mask for wide port (16-bit)
gpio_mask &= ~self._i2c_mask
self._gpio_mask = gpio_mask
self._set_gpio_direction(io_out, io_dir)
# as 3-phase clock frequency mode is required for I2C mode, the
# FTDI clock should be adapted to match the required frequency.
kwargs['direction'] = self.I2C_DIR | self._gpio_dir
kwargs['initial'] = self.IDLE | (io_out & self._gpio_mask)
self._gpio_last = self.IDLE | (io_out & self._gpio_mask)
kwargs['initial'] = self._gpio_last
kwargs['frequency'] = (3.0*frequency)/2.0
if not isinstance(url, str):
frequency = self._ftdi.open_mpsse_from_device(
Expand All @@ -512,7 +519,10 @@ def configure(self, url: Union[str, UsbDevice],
self._fake_tristate = True
self._wide_port = self._ftdi.has_wide_port
if not self._wide_port:
self._set_gpio_direction(8, io_out & 0xFF, io_dir & 0xFF)
gpio_mask = (1 << 8) - 1 # set mask for narrow port (8-bit)
gpio_mask &= ~(self._i2c_mask & 0xFF)
self._gpio_mask = gpio_mask
self._set_gpio_direction(io_out & 0xFF, io_dir & 0xFF)

def force_clock_mode(self, enable: bool) -> None:
"""Force unsupported I2C clock signalling on devices that have no I2C
Expand Down Expand Up @@ -542,12 +552,12 @@ def close(self, freeze: bool = False) -> None:
if self._ftdi.is_connected:
self._ftdi.close(freeze)

def terminate(self) -> None:
def terminate(self, freeze: bool = False) -> None:
"""Close the FTDI interface.

:note: deprecated API, use close()
"""
self.close()
self.close(freeze)

def get_port(self, address: int) -> I2cPort:
"""Obtain an I2cPort to drive an I2c slave.
Expand Down Expand Up @@ -631,7 +641,7 @@ def direction(self) -> int:

@property
def gpio_pins(self) -> int:
"""Report the configured GPIOs as a bitfield.
"""Report the available GPIOs as a bitfield.

A true bit represents a GPIO, a false bit a reserved or not
configured pin.
Expand Down Expand Up @@ -902,44 +912,55 @@ def read_gpio(self, with_output: bool = False) -> int:
value &= ~self._gpio_dir
return value

def write_gpio(self, value: int) -> None:
def write_gpio(self, value: int, with_input: bool = True) -> None:
"""Write GPIO port.

:param value: the GPIO port pins as a bitfield
"""
with self._lock:
if (value & self._gpio_dir) != value:
if (value & self._gpio_dir) != value and not with_input:
raise I2cIOError('No such GPO pins: %04x/%04x' %
(self._gpio_dir, value))
# perform read-modify-write
use_high = self._wide_port and (self.direction & 0xff00)
data = self._read_raw(use_high)
data &= ~self._gpio_mask
data |= value
data &= ~self._gpio_mask # removes gpio data, leaves I2C data
data |= value # combine I2C data with new gpio value
self._write_raw(data, use_high)
self._gpio_low = data & 0xFF & ~self._i2c_mask
self._gpio_last = data & self._gpio_mask # last user gpio

def set_gpio_direction(self, pins: int, direction: int) -> None:
def set_gpio_direction(self, pins: int, direction: int,
immediate: bool = True) -> None:
"""Change the direction of the GPIO pins.

:param pins: which GPIO pins should be reconfigured
:param direction: direction bitfield (on for output)
:param immediate: force update the pin states NOW, otherwise
waits for next write_gpio command
"""
with self._lock:
self._set_gpio_direction(16 if self._wide_port else 8,
pins, direction)

def _set_gpio_direction(self, width: int, pins: int,
self._set_gpio_direction(pins, direction)
if immediate:
# perform read-without_modify-write to force new pins
use_high = self._wide_port and (self.direction & 0xff00)
data = self._read_raw(use_high)
data &= ~self._gpio_mask # removes gpio data, leaves I2C data
data |= self._gpio_last # combine I2C data with last user gpio
self._write_raw(data, use_high)
self._gpio_low = data & 0xFF & ~self._i2c_mask

def _set_gpio_direction(self, pins: int,
direction: int) -> None:
if pins & self._i2c_mask:
raise I2cIOError('Cannot access I2C pins as GPIO')
gpio_mask = (1 << width) - 1
gpio_mask &= ~self._i2c_mask
if (pins & gpio_mask) != pins:
# gpio_mask = (1 << width) - 1
# gpio_mask &= ~self._i2c_mask
if (pins & self._gpio_mask) != pins:
raise I2cIOError('No such GPIO pin(s)')
self._gpio_dir &= ~pins
self._gpio_dir |= (pins & direction)
self._gpio_mask = gpio_mask & pins
# self._gpio_mask = gpio_mask & pins

@property
def _data_lo(self) -> Tuple[int]:
Expand Down
66 changes: 43 additions & 23 deletions pyftdi/spi.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,20 +308,22 @@ def read(self, with_output: bool = False) -> int:
"""
return self._controller.read_gpio(with_output)

def write(self, value: int) -> None:
def write(self, value: int, with_input: bool = True) -> None:
"""Write GPIO port.

:param value: the GPIO port pins as a bitfield
:with_input: do not error when writing to inputs
"""
return self._controller.write_gpio(value)
return self._controller.write_gpio(value, with_input)

def set_direction(self, pins: int, direction: int) -> None:
def set_direction(self, pins: int, direction: int,
immediate: bool = True) -> None:
"""Change the direction of the GPIO pins.

:param pins: which GPIO pins should be reconfigured
:param direction: direction bitfield (high level for output)
"""
self._controller.set_gpio_direction(pins, direction)
self._controller.set_gpio_direction(pins, direction, immediate)


class SpiController:
Expand All @@ -347,6 +349,7 @@ def __init__(self, cs_count: int = 1, turbo: bool = True):
self._gpio_port = None
self._gpio_dir = 0
self._gpio_mask = 0
self._gpio_last = 0
self._gpio_low = 0
self._wide_port = False
self._cs_count = cs_count
Expand Down Expand Up @@ -429,9 +432,13 @@ def configure(self, url: Union[str, UsbDevice],
# until the device is open, there is no way to tell if it has a
# wide (16) or narrow port (8). Lower API can deal with any, so
# delay any truncation till the device is actually open
self._set_gpio_direction(16, (~self._spi_mask) & 0xFFFF, io_dir)
gpio_mask = (1 << 16) - 1 # set mask for wide port (16-bit)
gpio_mask &= ~self._spi_mask
self._gpio_mask = gpio_mask
self._set_gpio_direction(io_out, io_dir)
kwargs['direction'] = self._spi_dir | self._gpio_dir
kwargs['initial'] = self._cs_bits | (io_out & self._gpio_mask)
self._gpio_last = self._cs_bits | (io_out & self._gpio_mask)
kwargs['initial'] = self._gpio_last
if not isinstance(url, str):
self._frequency = self._ftdi.open_mpsse_from_device(
url, interface=interface, **kwargs)
Expand All @@ -440,7 +447,10 @@ def configure(self, url: Union[str, UsbDevice],
self._ftdi.enable_adaptive_clock(False)
self._wide_port = self._ftdi.has_wide_port
if not self._wide_port:
self._set_gpio_direction(8, io_out & 0xFF, io_dir & 0xFF)
gpio_mask = (1 << 8) - 1 # set mask for narrow port (8-bit)
gpio_mask &= ~(self._spi_mask & 0xFF)
self._gpio_mask = gpio_mask
self._set_gpio_direction(io_out & 0xFF, io_dir & 0xFF)

def close(self, freeze: bool = False) -> None:
"""Close the FTDI interface.
Expand All @@ -453,12 +463,12 @@ def close(self, freeze: bool = False) -> None:
self._ftdi.close(freeze)
self._frequency = 0.0

def terminate(self) -> None:
def terminate(self, freeze: bool = False) -> None:
"""Close the FTDI interface.

:note: deprecated API, use close()
"""
self.close()
self.close(freeze)

def get_port(self, cs: int, freq: Optional[float] = None,
mode: int = 0) -> SpiPort:
Expand Down Expand Up @@ -560,7 +570,7 @@ def active_channels(self) -> Set[int]:

@property
def gpio_pins(self):
"""Report the configured GPIOs as a bitfield.
"""Report the available GPIOs as a bitfield.

A true bit represents a GPIO, a false bit a reserved or not
configured pin.
Expand Down Expand Up @@ -672,44 +682,54 @@ def read_gpio(self, with_output: bool = False) -> int:
value &= ~self._gpio_dir
return value

def write_gpio(self, value: int) -> None:
def write_gpio(self, value: int, with_input: bool = True) -> None:
"""Write GPIO port

:param value: the GPIO port pins as a bitfield
"""
with self._lock:
if (value & self._gpio_dir) != value:
if (value & self._gpio_dir) != value and not with_input:
raise SpiIOError('No such GPO pins: %04x/%04x' %
(self._gpio_dir, value))
# perform read-modify-write
use_high = self._wide_port and (self.direction & 0xff00)
data = self._read_raw(use_high)
data &= ~self._gpio_mask
data |= value
data &= ~self._gpio_mask # removes gpio data, leaves SPI data
data |= value # combine SPI data with new gpio value
self._write_raw(data, use_high)
self._gpio_low = data & 0xFF & ~self._spi_mask

def set_gpio_direction(self, pins: int, direction: int) -> None:
def set_gpio_direction(self, pins: int, direction: int,
immediate: bool = True) -> None:
"""Change the direction of the GPIO pins

:param pins: which GPIO pins should be reconfigured
:param direction: direction bitfield (on for output)
:param immediate: force update the pin states NOW, otherwise
waits for next write_gpio command
"""
with self._lock:
self._set_gpio_direction(16 if self._wide_port else 8,
pins, direction)

def _set_gpio_direction(self, width: int, pins: int,
self._set_gpio_direction(pins, direction)
if immediate:
# perform read-without_modify-write to force new pins
use_high = self._wide_port and (self.direction & 0xff00)
data = self._read_raw(use_high)
data &= ~self._gpio_mask # removes gpio data, leaves I2C data
data |= self._gpio_last # combine I2C data with last user gpio
self._write_raw(data, use_high)
self._gpio_low = data & 0xFF & ~self._spi_mask

def _set_gpio_direction(self, pins: int,
direction: int) -> None:
if pins & self._spi_mask:
raise SpiIOError('Cannot access SPI pins as GPIO')
gpio_mask = (1 << width) - 1
gpio_mask &= ~self._spi_mask
if (pins & gpio_mask) != pins:
# gpio_mask = (1 << width) - 1
# gpio_mask &= ~self._spi_mask
if (pins & self._gpio_mask) != pins:
raise SpiIOError('No such GPIO pin(s)')
self._gpio_dir &= ~pins
self._gpio_dir |= (pins & direction)
self._gpio_mask = gpio_mask & pins
# self._gpio_mask = gpio_mask & pins

def _read_raw(self, read_high: bool) -> int:
if not self._ftdi.is_connected:
Expand Down