diff --git a/pyftdi/doc/api/spi.rst b/pyftdi/doc/api/spi.rst index 3cf74d18..1ba9cd52 100644 --- a/pyftdi/doc/api/spi.rst +++ b/pyftdi/doc/api/spi.rst @@ -72,6 +72,46 @@ Example: communication with a SPI device and an extra GPIO pin = bool(gpio.read() & 0x20) +Example: communication with a MicroWire 93LC56B data flash (half-duplex, +bi-directional data, active high CS example). + +NOTE: This is the EEPROM used by many FTDI devices. If accessing a +93LC56B attached to a FTDI device, be sure that the FTDI device is +forced into reset by grounding its RESET# signal. + +.. code-block:: python + + # Import SpiController & hexdump + from pyftdi.spi import SpiController + from pyftdi.misc import hexdump + + # Instanciate a SPI controller + mw = SpiController(cs_count=1,cs_act_hi=True) + + # Configure the second interface (IF/2) of the FTDI device as a SPI master + mw.configure('ftdi://ftdi:2232h/2') + + # Get a port to a SPI slave w/ CS on A*BUS3 and SPI mode 0 @ 1MHz, + # bi-directional data (ie. a single data line like I2C) + slave = mw.get_port(cs=0, freq=1E6, mode=0, bidir=True) + + # Read 256 bytes from EEPROM starting at address 0 + addr = 0 + eeprom = slave.exchange([0x06, addr], 256) + + # byte swap to handle data in little endian + eeprom[0::2], eeprom[1::2] = eeprom[1::2], eeprom[0::2] + + # Print contents of eeprom + print(hexdump(eeprom)) + + # Convert to a byte string, if desired + eepromB = eeprom.tobytes() + + # Close the SPI Controller + mw.terminate() + + Classes ~~~~~~~ @@ -97,6 +137,9 @@ SPI sample tests expect: * ADXL345 device on /CS 1, SPI mode 3 * RFDA2125 device on /CS 2, SPI mode 0 +MicroWireSPI sample tests expect: + * 93LC56B device on CS 0, SPI mode 0, bi-directional data + Checkout a fresh copy from PyFtdi_ github repository. See :doc:`../pinout` for FTDI wiring. diff --git a/pyftdi/serialext/tests/pyterm.py b/pyftdi/serialext/tests/pyterm.py old mode 100755 new mode 100644 diff --git a/pyftdi/spi.py b/pyftdi/spi.py index 2bef4650..4ecf31c8 100644 --- a/pyftdi/spi.py +++ b/pyftdi/spi.py @@ -60,16 +60,23 @@ class SpiPort: >>> out.extend(spi.exchange([], 2, False, True)) """ - def __init__(self, controller, cs, cs_hold=3, spi_mode=0): + def __init__(self, controller, cs, cs_hold=3, spi_mode=0, + cs_count=4, cs_act_hi=False, bidir=False): self._controller = controller self._cpol = spi_mode & 0x1 self._cpha = spi_mode & 0x2 cs_clock = 0xFF & ~((int(not self._cpol) and SpiController.SCK_BIT) | SpiController.DO_BIT) - cs_select = 0xFF & ~((SpiController.CS_BIT << cs) | + + cs_bits = (((SpiController.CS_BIT << cs_count) - 1) & + ~(SpiController.CS_BIT - 1)) + + cs_bit_sel = (SpiController.CS_BIT << cs) ^ (cs_bits * cs_act_hi) + cs_select = 0xFF & ~(cs_bit_sel | (int(not self._cpol) and SpiController.SCK_BIT) | SpiController.DO_BIT) self._cs_prolog = bytes([cs_clock, cs_select]) + self._bidir = bidir and cs_select or None self._cs_epilog = bytes([cs_select] + [cs_clock] * int(cs_hold)) self._frequency = self._controller.frequency @@ -95,7 +102,8 @@ def exchange(self, out=b'', readlen=0, start=True, stop=True, Use False if the transaction should complete with a further call to exchange() :param duplex: perform a full-duplex exchange (vs. half-duplex), - i.e. bits are clocked in and out at once. + i.e. bits are clocked in and out at once. + Note: forced to False if self._bidir is not None. :return: an array of bytes containing the data read out from the slave :rtype: array @@ -104,7 +112,8 @@ def exchange(self, out=b'', readlen=0, start=True, stop=True, start and self._cs_prolog, stop and self._cs_epilog, self._cpol, self._cpha, - duplex=duplex) + duplex=(not self._bidir and duplex), + bidir=self._bidir) def read(self, readlen=0, start=True, stop=True): """Read out bytes from the slave @@ -124,7 +133,8 @@ def read(self, readlen=0, start=True, stop=True): return self._controller.exchange(self._frequency, [], readlen, start and self._cs_prolog, stop and self._cs_epilog, - self._cpol, self._cpha) + self._cpol, self._cpha, + bidir=self._bidir) def write(self, out, start=True, stop=True): """Write bytes to the slave @@ -142,7 +152,8 @@ def write(self, out, start=True, stop=True): return self._controller.exchange(self._frequency, out, 0, start and self._cs_prolog, stop and self._cs_epilog, - self._cpol, self._cpha) + self._cpol, self._cpha, + bidir=self._bidir) def flush(self): """Force the flush of the HW FIFOs""" @@ -243,7 +254,8 @@ class SpiController: SPI_BITS = DI_BIT | DO_BIT | SCK_BIT PAYLOAD_MAX_LENGTH = 0x10000 # 16 bits max - def __init__(self, silent_clock=False, cs_count=4, turbo=True): + def __init__(self, silent_clock=False, cs_count=4, turbo=True, + cs_act_hi=False): self._ftdi = Ftdi() self._lock = Lock() self._gpio_port = None @@ -256,6 +268,18 @@ def __init__(self, silent_clock=False, cs_count=4, turbo=True): self._immediate = bytes((Ftdi.SEND_IMMEDIATE,)) self._frequency = 0.0 self._clock_phase = False + self._cs_idle = 0 + + # If True CS, starts LOW and then goes HIGH to select device + # (ie. Active High) + # If False CS, starts HIGH and then goes LOW to select device + # (ie. Active Low) + # + # This is here instead of in get_port, where mode is set, is + # because the initial setting of CS is done in configure(), so + # need to save cs_act_hi here and use it during configure(). + self._cs_act_hi = cs_act_hi + @property def direction(self): @@ -291,16 +315,22 @@ def configure(self, url, **kwargs): with self._lock: if self._frequency > 0.0: raise SpiIOError('Already configured') - self._cs_bits = (((SpiController.CS_BIT << self._cs_count) - 1) & - ~(SpiController.CS_BIT - 1)) + + cs_bits = (((SpiController.CS_BIT << self._cs_count) - 1) & + ~(SpiController.CS_BIT - 1)) + + # If CS is Active Low, CS idle state is all high + # If CS is Active High, CS idle state is all low (0x00) + self._cs_idle = (not self._cs_act_hi) and cs_bits or 0x00 + self._spi_ports = [None] * self._cs_count - self._spi_dir = (self._cs_bits | + self._spi_dir = (cs_bits | SpiController.DO_BIT | SpiController.SCK_BIT) - self._spi_mask = self._cs_bits | self.SPI_BITS + self._spi_mask = cs_bits | self.SPI_BITS self._frequency = self._ftdi.open_mpsse_from_url( - # /CS all high - url, direction=self._spi_dir, initial=self._cs_bits, **kwargs) + # /CS inactive + url, direction=self._spi_dir, initial=self._cs_idle, **kwargs) self._ftdi.enable_adaptive_clock(False) self._wide_port = self._ftdi.has_wide_port @@ -310,7 +340,7 @@ def terminate(self): self._ftdi.close() self._frequency = 0.0 - def get_port(self, cs, freq=None, mode=0): + def get_port(self, cs, freq=None, mode=0, bidir=False): """Obtain a SPI port to drive a SPI device selected by Chip Select. :note: SPI mode 2 is not supported. @@ -318,6 +348,8 @@ def get_port(self, cs, freq=None, mode=0): :param int cs: chip select slot, starting from 0 :param float freq: SPI bus frequency for this slave in Hz :param int mode: SPI mode [0,1,3] + :param bool bidir: If True, Data is a single, bi-directional line; + If False, two uni-directional data lines :rtype: SpiPort """ with self._lock: @@ -337,7 +369,10 @@ def get_port(self, cs, freq=None, mode=0): freq = min(freq or self.frequency_max, self.frequency_max) hold = freq and (1+int(1E6/freq)) self._spi_ports[cs] = SpiPort(self, cs, cs_hold=hold, - spi_mode=mode) + spi_mode=mode, + cs_count=self._cs_count, + cs_act_hi=self._cs_act_hi, + bidir=bidir) self._spi_ports[cs].set_frequency(freq) self._flush() return self._spi_ports[cs] @@ -368,7 +403,8 @@ def gpio_pins(self): def exchange(self, frequency, out, readlen, cs_prolog=None, cs_epilog=None, - cpol=False, cpha=False, duplex=False): + cpol=False, cpha=False, duplex=False, + bidir=None): if duplex: if readlen > len(out): tmp = array('B', out) @@ -385,7 +421,7 @@ def exchange(self, frequency, out, readlen, else: return self._exchange_half_duplex(frequency, out, readlen, cs_prolog, cs_epilog, - cpol, cpha) + cpol, cpha, bidir) def read_gpio(self, with_output=False): """Read GPIO port @@ -468,7 +504,7 @@ def _write_raw(self, data, write_high): self._ftdi.write_data(cmd) def _exchange_half_duplex(self, frequency, out, readlen, - cs_prolog, cs_epilog, cpol, cpha): + cs_prolog, cs_epilog, cpol, cpha, bidir=None): if not self._ftdi: raise SpiIOError("FTDI controller not initialized") if len(out) > SpiController.PAYLOAD_MAX_LENGTH: @@ -493,14 +529,29 @@ def _exchange_half_duplex(self, frequency, out, readlen, ctrl &= self._spi_mask ctrl |= self._gpio_low cmd.extend((Ftdi.SET_BITS_LOW, ctrl, direction)) + if bidir: + # set DO to an input during turnaround for bi-directional + # implementations (single DI/DO line). Must also handle + # gpio in bctrl. + bctrl = bidir + bctrl &= self._spi_mask + bctrl |= self._gpio_low + turnaround = array('B', [Ftdi.SET_BITS_LOW, + bctrl, + direction & (~SpiController.DO_BIT)]) epilog = array('B') if cs_epilog: for ctrl in cs_epilog: ctrl &= self._spi_mask ctrl |= self._gpio_low - epilog.extend((Ftdi.SET_BITS_LOW, ctrl, direction)) + # if bidir in not None and data is to be read (readlen != 0), keep + # the DO Bit as an input + bitdir = (((bidir != None) and readlen) + and (direction & (~SpiController.DO_BIT)) + or direction) + epilog.extend((Ftdi.SET_BITS_LOW, ctrl, bitdir)) # Restore idle state - cs_high = [Ftdi.SET_BITS_LOW, self._cs_bits | self._gpio_low, + cs_high = [Ftdi.SET_BITS_LOW, self._cs_idle | self._gpio_low, direction] if not self._turbo: cs_high.append(Ftdi.SEND_IMMEDIATE) @@ -516,6 +567,8 @@ def _exchange_half_duplex(self, frequency, out, readlen, cmd.frombytes(write_cmd) cmd.extend(out) if readlen: + if bidir: + cmd.extend(turnaround) rcmd = (cpol ^ cpha) and \ Ftdi.READ_BYTES_PVE_MSB or Ftdi.READ_BYTES_NVE_MSB read_cmd = spack(' +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of the Neotion nor the names of its contributors may +# be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL NEOTION BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, +# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, +# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from binascii import hexlify +from doctest import testmod +from os import environ +from pyftdi import FtdiLogger +from pyftdi.ftdi import FtdiError +from pyftdi.spi import SpiController, SpiIOError +from pyftdi.misc import hexdump +from array import array +from sys import modules, stderr, stdout +from time import sleep +import logging +import unittest + + +class MicrowireData93LC56BTest(object): + """Basic class for a Microchip MicroWire 93LC56B data flash device + selected as CS0, SPI mode 0, Active High polarity for CS and + Bi-directional data. MicroWire is very similar to SPI so the + basic SpiController can be used. + + Test setup: UM232H connected to the EEPROM on a FT4232H-56Q + Mini Module. The FT4232H is forced into reset by connecting the + RT# pin (CN2-8) to GND (CN2-6). Then make the following + connections between the boards: + + UM232H FT4232H-56Q + ====== =========== + GND - GND + D0 (CN2-1) - ECL (CN3-6) + D1 (CN2-2) - EDA (CN3-7) + D2 (CN2-3) - EDA (CN3-7) + D3 (CN2-4) - ECS (CN3-5) + + NOTE: D1 & D2 are indeed both tied to the same EDA pin. + + """ + + def __init__(self): + self._mw = SpiController(cs_count=1,cs_act_hi=True) + self._freq = 1E6 + self._mode = 0 + self._bidir = True + + # Maximum number of read cycles to wait while looking for the + # Ready status after each write + self._write_timeout_cnt = 25 + + # According to the datasheet, the maximum write time is 6 ms + self._Twc = 0.006 + + # The opcodes are a full byte to make it easy to use with the + # byte interface of SpiController. These opcodes also include + # the start bit (SB), which is simply the left-most '1' + # bit. The actual 2-bit opcode (OC) follows this start bit. + # + # The instructions ERAL, EWDS, EWEN and WRAL require a special + # address byte to complete the opcode. So they are 2 element + # lists whereas the others are single element lists. + self._SBOC_erase = [0x07] + self._SBOC_eral = [0x04, 0x80] # requires EEPROM Vcc >= 4.5V + self._SBOC_ewds = [0x04, 0x00] + self._SBOC_ewen = [0x04, 0xc0] + self._SBOC_read = [0x06] + self._SBOC_write = [0x05] + self._SBOC_wral = [0x04, 0x40] # requires EEPROM Vcc >= 4.5V + + def open(self): + """Open an MicroWire connection to a slave""" + url = environ.get('FTDI_DEVICE', 'ftdi://ftdi:232h/1') + self._mw.configure(url) + + def read_word(self, addr): + """Read a single 16-bit word from the EEPROM starting at word address, addr""" + + # NOTE: Using SPI Mode 0. This really should have the FTDI + # clock the read bits in on the rising edge, at least based on + # my understanding of SPI. However, spi.py reads the bits on + # the falling edge of the clock. For the 93LC56B, this is + # exactly what we want. However, if spi.py ever gets changed, + # will need to do writes and reads seperately with reads in + # SPI Mode 1. + port = self._mw.get_port(0, freq=self._freq, + mode=self._mode, bidir=self._bidir) + + # byteswap() is to handle little endian data + word = port.exchange(self._SBOC_read+[(addr&0xFF)],2) + word[0],word[1] = word[1],word[0] + return word.tobytes() + + def read_all(self, readlen): + # NOTE: Using SPI Mode 0. This really should have the FTDI + # clock the read bits in on the rising edge, at least base don + # my understanding of SPI. However, spi.py reads the bits on + # the falling edge of the clock. For the 93LC56B, this is + # exactly what we want. However, if spi.py ever gets changed, + # will need to do writes and reads seperately with reads in + # SPI Mode 1. + port = self._mw.get_port(0, freq=self._freq, + mode=self._mode, bidir=self._bidir) + + # readlen is byte len so make sure it is an even number since + # EEPROM is 16-bit device. + if (readlen & 0x01): + err = "readlen must be even - the EEPROM is a 16-bit device" + raise SpiIOError(err) + + data = port.exchange(self._SBOC_read+[0x00], readlen) + + # byte swap to handle data in little endian (from: + # https://stackoverflow.com/questions/36096292/ + # efficient-way-to-swap-bytes-in-python) + data[0::2], data[1::2] = data[1::2], data[0::2] + + #print('DATA: ', data) + #words = spack('H'*(len(data)//2), data) + + return data.tobytes() + + def calc_eeprom_checksum(self, data): + """Calculate EEPROM checksum over the data + + :param bytes data: data to compute checksum over. Must be + an even number of bytes to properly + compute checksum. + """ + + if isinstance(data, bytes): + pass + elif isinstance(data, array): + data = data.tobytes() + elif isinstance(data, list): + data = bytes(data) + else: + raise SpiIOError("incompatible type for data") + + if (len(data) & 0x01): + err = "data length must be even - the EEPROM is a 16-bit device" + raise SpiIOError(err) + + # NOTE: checksum is computed using 16-bit values in little + # endian ordering + checksum = 0xAAAA + for idx in range(0, len(data), 2): + val = ((data[idx+1] << 8) + data[idx]) & 0x0ffff + checksum = val^checksum + checksum = (((checksum << 1) & 0x0ffff) | + ((checksum >> 15) & 0x0ffff)) + + return checksum + + def write_word(self, addr, word): + port = self._mw.get_port(0, freq=self._freq, + mode=self._mode, bidir=self._bidir) + + # Must first enable Erase/Write + port.exchange(self._SBOC_ewen) + + # Send the word, LSB first (little endian) + port.exchange(self._SBOC_write+[(addr&0xFF), + word & 0x0000ff, + (word & 0x00ff00) >> 8]) + + # Wait the write time + sleep(self._Twc) + + # send a stop condition if sent at least 1 read with stop + # False. Data is thrown away. + status = port.read(1) + print('Status: {}'.format(status)) + + # Check the last bit of the last byte to make sure it is high + # for Ready + if ((status[-1] & 0x01) == 0x00): + raise SpiIOError('ERROR: SPI Write never completed!') + + # Now disable Erase/Write since done with this write + port.exchange(self._SBOC_ewds) + + + # Write multiple bytes starting at byte address, addr. Length of + # data must be a multiple of 2 since the EEPROM is 16-bits. So + # extend data by 1 byte if this is not the case. + def write(self, addr, data): + + if isinstance(data, bytes): + pass + elif isinstance(data, array): + data = data.tobytes() + elif isinstance(data, list): + data = bytes(data) + else: + raise SpiIOError("incompatible type for data") + + # If addr is odd, raise an exception since it must be even + if (addr & 0x01): + err = "write addr must be even - the EEPROM is a 16-bit device" + raise SpiIOError(err) + + wd_addr = (addr >> 1) # convert to word address + + # if the byte data is an odd number of bytes, force it to be + # on 16-bit divisions + if (len(data) & 0x01): + err = "data length must be even - the EEPROM is a 16-bit device" + raise SpiIOError(err) + + port = self._mw.get_port(0, freq=self._freq, + mode=self._mode, bidir=self._bidir) + + # Must first enable Erase/Write + port.exchange(self._SBOC_ewen) + + for idx in range(0, len(data), 2): + # Send the word, MSB first + port.exchange(self._SBOC_write+[(wd_addr&0xFF), + data[idx+1], + data[idx]]) + + # Wait the write time + sleep(self._Twc) + + # send a stop condition if sent at least 1 read with stop + # False. Data is thrown away. + status = port.read(1) + + # Check the last bit of the last byte to make sure it is + # high for Ready + if ((status[-1] & 0x01) == 0x00): + print('ERROR: Last write never completed! Aborting!') + break + + # increment to the next word address + wd_addr += 1 + + # Now disable Erase/Write since done with this write + port.exchange(self._SBOC_ewds) + + + def close(self): + """Close the MicroWire connection""" + self._mw.terminate() + + +class MicrowireTestCase(unittest.TestCase): + """FTDI MicroWire driver test case + + Simple test to demonstrate MicroWire feature. + + Please ensure that the HW you connect to the FTDI port A does match + the encoded configuration. GPIOs can be driven high or low, so check + your HW setup before running this test as it might damage your HW. + + Do NOT run this test if you use FTDI port A as an UART or I2C + bridge -or any unsupported setup!! You've been warned. + """ + + def test_microwire1(self): + """Test reading EEPROM with read_word() and read_all()""" + chksumAct = 0 + chksumExp = 1 + chksumSav = 2 + + print() + mw = MicrowireData93LC56BTest() + mw.open() + + try: + # Test read_word() + data = b'' + for addr in range(0,256//2): + data += mw.read_word(addr) + print(hexdump(data)) + + # check that the right number of bytes were read + self.assertTrue(len(data) == 256) + + # Pull out actual checksum from EEPROM data + chksumAct = (data[-1] << 8) | data[-2] + + # compute expected checksum value over the EEPROM + # contents, except the EEPROM word + chksumExp = mw.calc_eeprom_checksum(data[:-2]) + + print('Checksum Actual: 0x{:04x} Expected: 0x{:04x}' + .format(chksumAct,chksumExp)) + + except FtdiError: + self.assertTrue(chksumAct == chksumExp) + except SpiIOError: + self.assertTrue(chksumAct == chksumExp) + else: + self.assertTrue(chksumAct == chksumExp) + + mw.close() + + # save checksum so can be compared when data is read with + # read_all() + chksumSav = chksumAct + + # reset chksumAct & chksumExp + chksumAct = 0 + chksumExp = 1 + + print() + mw = MicrowireData93LC56BTest() + mw.open() + + try: + # Test read_all() + data = mw.read_all(256) + print(hexdump(data)) + + # check that the right number of bytes were read + self.assertTrue(len(data) == 256) + + # Pull out actual checksum from EEPROM data + chksumAct = (data[-1] << 8) | data[-2] + + # compute expected checksum value over the EEPROM + # contents, except the EEPROM word + chksumExp = mw.calc_eeprom_checksum(data[:-2]) + + print('Checksum Actual: 0x{:04x} Expected: 0x{:04x}' + .format(chksumAct,chksumExp)) + + except FtdiError: + self.assertTrue(chksumAct == chksumExp) + except SpiIOError: + self.assertTrue(chksumAct == chksumExp) + else: + self.assertTrue(chksumAct == chksumExp) + self.assertTrue(chksumAct == chksumSav) + + mw.close() + +def suite(): + suite_ = unittest.TestSuite() + suite_.addTest(unittest.makeSuite(MicrowireTestCase, 'test')) + return suite_ + + +if __name__ == '__main__': + testmod(modules[__name__]) + FtdiLogger.log.addHandler(logging.StreamHandler(stdout)) + level = environ.get('FTDI_LOGLEVEL', 'info').upper() + try: + loglevel = getattr(logging, level) + except AttributeError: + raise ValueError('Invalid log level: %s', level) + FtdiLogger.set_level(loglevel) + unittest.main(defaultTest='suite') diff --git a/pyftdi/tests/spi.py b/pyftdi/tests/spi.py old mode 100755 new mode 100644 diff --git a/pyftdi/tests/uart.py b/pyftdi/tests/uart.py old mode 100755 new mode 100644 diff --git a/setup.py b/setup.py old mode 100755 new mode 100644