diff --git a/nesc/whip6/apps/test_wnesc_compile.py b/nesc/whip6/apps/test_wnesc_compile.py index 15bc7c5..4db2ece 100755 --- a/nesc/whip6/apps/test_wnesc_compile.py +++ b/nesc/whip6/apps/test_wnesc_compile.py @@ -95,6 +95,6 @@ def get_status_str(value): f.write('{},{},{},{}\n'.format(app, data['time'], data['size'], data['overflow'])) print('----------------------------------------------------------') -print(colored('good apps:', 'green'), ','.join(apps_good)) -print(colored('bad apps:', 'red'), ','.join(apps_bad)) -print('result: {}/{}'.format(len(apps_good), len(apps_good) + len(apps_bad))) +print((colored('good apps:', 'green'), ','.join(apps_good))) +print((colored('bad apps:', 'red'), ','.join(apps_bad))) +print(('result: {}/{}'.format(len(apps_good), len(apps_good) + len(apps_bad)))) diff --git a/nesc/whip6/platforms/parts/mcu/cortex-m3/scripts/symbols2sizes.py b/nesc/whip6/platforms/parts/mcu/cortex-m3/scripts/symbols2sizes.py index 2490806..178249f 100755 --- a/nesc/whip6/platforms/parts/mcu/cortex-m3/scripts/symbols2sizes.py +++ b/nesc/whip6/platforms/parts/mcu/cortex-m3/scripts/symbols2sizes.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # # whip6: Warsaw High-performance IPv6. # @@ -24,7 +24,7 @@ 'list of symbol (output of nm -t posix) with files of the sizes ' 'corresponding to the symbols. They can then be visualized with tools ' 'like Baobab.') -parser.add_argument('--kinds', '-k', choices=KINDS.keys(), default='flash', +parser.add_argument('--kinds', '-k', choices=list(KINDS.keys()), default='flash', help='kinds of symbols to choose') parser.add_argument('output_dir', help='output directory') @@ -47,8 +47,8 @@ filename = os.path.join(args.output_dir, *parts) try: os.makedirs(os.path.dirname(filename)) - except OSError, e: + except OSError as e: if e.errno != errno.EEXIST: raise with open(filename, 'wb') as f: - f.write('X' * size) + f.write(b'X' * size) diff --git a/nesc/whip6/platforms/tools/compiler/make/makebuild.py b/nesc/whip6/platforms/tools/compiler/make/makebuild.py index 793c24a..e29f103 100644 --- a/nesc/whip6/platforms/tools/compiler/make/makebuild.py +++ b/nesc/whip6/platforms/tools/compiler/make/makebuild.py @@ -36,8 +36,8 @@ def _gen_makefile(self, f): include_paths.extend(self.collect_config_list(INCL_PATHS)) defines = self.collect_config_list(DEFINITIONS) cflags = self.collect_config_list(C_FLAGS) - cflags.extend(map(lambda p: '-I' + p, include_paths)) - cflags.extend(map(lambda d: '-D' + d, defines)) + cflags.extend(['-I' + p for p in include_paths]) + cflags.extend(['-D' + d for d in defines]) ldflags = self.collect_config_list(LD_FLAGS) @@ -50,15 +50,15 @@ def _gen_makefile(self, f): makefile = self.find_config_value(MAKEFILE) if not makefile: raise RuntimeError('no "%s" specified' % (MAKEFILE,)) - print >>f, 'APP_NAME :=', app_name - print >>f, 'BOARD_NAME :=', self.board - print >>f, 'PROJECT_ROOT :=', self.project_root - print >>f, 'ALT_PROJECT_ROOT :=', self.alt_project_root - print >>f, 'BUILD_DIR :=', self.build_dir - print >>f, 'LDFLAGS += ', ' '.join(ldflags) - print >>f, 'CFLAGS += ', ' '.join(cflags) - print >>f, 'OBJS :=', ' '.join(objects) - print >>f, 'include ', makefile + print('APP_NAME :=', app_name, file=f) + print('BOARD_NAME :=', self.board, file=f) + print('PROJECT_ROOT :=', self.project_root, file=f) + print('ALT_PROJECT_ROOT :=', self.alt_project_root, file=f) + print('BUILD_DIR :=', self.build_dir, file=f) + print('LDFLAGS += ', ' '.join(ldflags), file=f) + print('CFLAGS += ', ' '.join(cflags), file=f) + print('OBJS :=', ' '.join(objects), file=f) + print('include ', makefile, file=f) def run_step(self): makeopts = self.collect_config_list(MAKE_OPTS) diff --git a/nesc/whip6/platforms/tools/compiler/nescc/nesccompile.py b/nesc/whip6/platforms/tools/compiler/nescc/nesccompile.py index cdeb65d..bdabe36 100644 --- a/nesc/whip6/platforms/tools/compiler/nescc/nesccompile.py +++ b/nesc/whip6/platforms/tools/compiler/nescc/nesccompile.py @@ -144,8 +144,8 @@ def _find_native_gcc(self): if 'LLVM' in version: continue if c != 'gcc-4.6': - print 'WARNING: gcc-4.6 not found. Note that this is the recommended' - print 'compiler version for NesC.' + print('WARNING: gcc-4.6 not found. Note that this is the recommended') + print('compiler version for NesC.') return c except Exception: pass diff --git a/nesc/whip6/platforms/tools/other/gengdbinit.py b/nesc/whip6/platforms/tools/other/gengdbinit.py index d6804d2..5f2a0c3 100644 --- a/nesc/whip6/platforms/tools/other/gengdbinit.py +++ b/nesc/whip6/platforms/tools/other/gengdbinit.py @@ -27,7 +27,7 @@ def _gen_gdbinit(self, f): paths.append(config[CONF_PATH]) paths.extend(self.collect_config_list(INCL_PATHS)) for path in paths: - print >>f, 'directory ', path + print('directory ', path, file=f) def run_step(self): with open(os.path.join(self.build_dir, 'gdbinit'), 'w') as f: diff --git a/nesc/whip6/platforms/tools/other/make_ext_libs.py b/nesc/whip6/platforms/tools/other/make_ext_libs.py index ef5dbfc..d09037f 100644 --- a/nesc/whip6/platforms/tools/other/make_ext_libs.py +++ b/nesc/whip6/platforms/tools/other/make_ext_libs.py @@ -39,7 +39,7 @@ def run_step(self): if RUN_MAKE_IN in config: for l in config[RUN_MAKE_IN]: relative_l = l[(len(self.project_root) + 1):] - print colored('Running make in %s ...' % (relative_l,), 'cyan') + print(colored('Running make in %s ...' % (relative_l,), 'cyan')) self.call('make', '-j', str(num_threads), '-C', l, *makeopts) # Exports the BuildStep to make it visible for smake diff --git a/nesc/whip6/platforms/tools/other/makeapp.py b/nesc/whip6/platforms/tools/other/makeapp.py index a04d422..0b374a6 100644 --- a/nesc/whip6/platforms/tools/other/makeapp.py +++ b/nesc/whip6/platforms/tools/other/makeapp.py @@ -25,10 +25,10 @@ def __init__(self, project_root, configs, flags): BuildStep.__init__(self, project_root, configs, flags) def run_step(self): - coreName = raw_input("What will be the core name of the " + coreName = input("What will be the core name of the " "app (fe. ButtonRadio, Blink)?\n> ").strip() - if not re.match(ur'^[A-Z]\w+$', coreName): + if not re.match(r'^[A-Z]\w+$', coreName): raise BuildError("Error: Name must match regex ^[A-Z]\w+$") newAppPath = join(self.project_root, APPS, coreName) @@ -36,7 +36,7 @@ def run_step(self): if os.path.exists(newAppPath): raise BuildError("Error: path %s already exists" % newAppPath) - srcAppName = raw_input("What will be the base app? This must be" + srcAppName = input("What will be the base app? This must be" " an existing app folder name\n" "[Blink] > ").strip() @@ -48,7 +48,7 @@ def run_step(self): if not os.path.exists(srcAppPath): raise BuildError("Error: Path %s does not exist" % srcAppPath) - repName = raw_input("What will be the replace word? This word will be" + repName = input("What will be the replace word? This word will be" " replaced in all files with %s\n" "[%s] > " % (coreName, srcAppName)).strip() diff --git a/nesc/whip6/platforms/tools/other/setuniqueids.py b/nesc/whip6/platforms/tools/other/setuniqueids.py index 1804ba8..8208915 100644 --- a/nesc/whip6/platforms/tools/other/setuniqueids.py +++ b/nesc/whip6/platforms/tools/other/setuniqueids.py @@ -9,7 +9,7 @@ # import glob import time -import cStringIO +import io from os.path import join from build_step import BuildStep @@ -37,21 +37,21 @@ def _replaceWithNumbers(self, cFile, bestIdsList, firstId): contents = f.read() contents = contents.replace(REMOVE_LINE, '') parts = contents.split(SYMBOL) - io = cStringIO.StringIO() - io.write(parts[0]) + stream = io.StringIO() + stream.write(parts[0]) for p in parts[1:]: - io.write(str(bestIdsList[firstId])) - io.write(p) + stream.write(str(bestIdsList[firstId])) + stream.write(p) firstId += 1 with open(cFile, 'w') as f: - f.write(io.getvalue()) + f.write(stream.getvalue()) return firstId def _findEfficientNumbers(self, maxNumber): """Efficient numbers are thouse which have no 0s (101 - bad), are short and have a small digit sum""" candidates = [] - for i in xrange(1, maxNumber + 1): + for i in range(1, maxNumber + 1): hasZero, cost = self._evaluateNumber(i) if not hasZero: candidates.append((cost, i)) @@ -66,7 +66,7 @@ def _evaluateNumber(self, n): digSum += d if d == 0: return True, -1 - n /= 10 + n //= 10 digCnt += 1 return (False, (digCnt + digSum)) diff --git a/nesc/whip6/platforms/tools/other/warn_about_printfs.py b/nesc/whip6/platforms/tools/other/warn_about_printfs.py index 6a8e4dd..d2ad847 100644 --- a/nesc/whip6/platforms/tools/other/warn_about_printfs.py +++ b/nesc/whip6/platforms/tools/other/warn_about_printfs.py @@ -15,7 +15,7 @@ from build_step import BuildStep -PRINTF_PATTERN = re.compile(ur'printf\s*\(\s*"') +PRINTF_PATTERN = re.compile(r'printf\s*\(\s*"') WARNING_MSG = ''' +-------------------------------------------------------------------+ | | @@ -35,7 +35,7 @@ def run_step(self): for cFile in c_files: with open(cFile, 'r') as f: if PRINTF_PATTERN.search(f.read()): - print colored(WARNING_MSG, 'yellow') + print(colored(WARNING_MSG, 'yellow')) return BuildStepImpl = WarnAboutPrintfs diff --git a/nesc/whip6/platforms/tools/programmer/ccbsl/cc2538-bsl.py b/nesc/whip6/platforms/tools/programmer/ccbsl/cc2538-bsl.py old mode 100755 new mode 100644 index 6d0668c..44bb904 --- a/nesc/whip6/platforms/tools/programmer/ccbsl/cc2538-bsl.py +++ b/nesc/whip6/platforms/tools/programmer/ccbsl/cc2538-bsl.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright (c) 2014, Jelmer Tiete . # All rights reserved. @@ -37,23 +37,22 @@ # Make sure you don't lock yourself out!! (enable backdoor in your firmware) # More info at https://github.com/JelmerT/cc2538-bsl -from __future__ import print_function from subprocess import Popen, PIPE -import sys, getopt +import sys +import getopt import glob import time -import tempfile import os -import subprocess import struct import binascii import traceback try: import magic + magic.from_file have_magic = True -except ImportError: +except (ImportError, AttributeError): have_magic = False try: @@ -62,27 +61,19 @@ except ImportError: have_hex_support = False -#version -VERSION_STRING = "2.0" +# version +__version__ = "2.1" # Verbose level QUIET = 5 -# Check which version of Python is running -PY3 = sys.version_info >= (3,0) - try: import serial except ImportError: print('{} requires the Python serial library'.format(sys.argv[0])) - print('Please install it with one of the following:') + print('Please install it with:') print('') - if PY3: - print(' Ubuntu: sudo apt-get install python3-serial') - print(' Mac: sudo port install py34-serial') - else: - print(' Ubuntu: sudo apt-get install python-serial') - print(' Mac: sudo port install py-serial') + print(' pip3 install pyserial') sys.exit(1) @@ -91,14 +82,16 @@ def mdebug(level, message, attr='\n'): print(message, end=attr, file=sys.stderr) # Takes chip IDs (obtained via Get ID command) to human-readable names -CHIP_ID_STRS = {0xb964: 'CC2538'} +CHIP_ID_STRS = {0xb964: 'CC2538', + 0xb965: 'CC2538' + } -RETURN_CMD_STRS = {0x40: 'Success', - 0x41: 'Unknown command', - 0x42: 'Invalid command', - 0x43: 'Invalid address', - 0x44: 'Flash fail' - } +RETURN_CMD_STRS = {0x40: 'Success', + 0x41: 'Unknown command', + 0x42: 'Invalid command', + 0x43: 'Invalid address', + 0x44: 'Flash fail' + } COMMAND_RET_SUCCESS = 0x40 COMMAND_RET_UNKNOWN_CMD = 0x41 @@ -106,9 +99,11 @@ def mdebug(level, message, attr='\n'): COMMAND_RET_INVALID_ADR = 0x43 COMMAND_RET_FLASH_FAIL = 0x44 + class CmdException(Exception): pass + class FirmwareFile(object): HEX_FILE_EXTENSIONS = ('hex', 'ihx', 'ihex') @@ -130,20 +125,19 @@ def __init__(self, path): path -- A str with the path to the firmware file. Attributes: - bytes: A bytearray with firmware contents ready to send to the device + bytes: A bytearray with firmware contents ready to send to the + device """ self._crc32 = None firmware_is_hex = False if have_magic: - file_type = bytearray(magic.from_file(path, True)) + file_type = magic.from_file(path, mime=True) - #from_file() returns bytes with PY3, str with PY2. This comparison - #will be True in both cases""" - if file_type == b'text/plain': + if file_type == 'text/plain': firmware_is_hex = True mdebug(5, "Firmware file: Intel Hex") - elif file_type == b'application/octet-stream': + elif file_type == 'application/octet-stream': mdebug(5, "Firmware file: Raw Binary") else: error_str = "Could not determine firmware type. Magic " \ @@ -188,35 +182,65 @@ def crc32(self): return self._crc32 + class CommandInterface(object): + ACK_BYTE = 0xCC NACK_BYTE = 0x33 - def open(self, aport='/dev/tty.usbserial-000013FAB', abaudrate=500000): - self.sp = serial.Serial( - port=aport, - baudrate=abaudrate, # baudrate - bytesize=8, # number of databits - parity=serial.PARITY_NONE, - stopbits=1, - xonxoff=0, # enable software flow control - rtscts=0, # disable RTS/CTS flow control - timeout=0.5 # set a timeout value, None for waiting forever - ) - def invoke_bootloader(self, dtr_active_high=False): + def open(self, aport=None, abaudrate=500000): + # Try to create the object using serial_for_url(), or fall back to the + # old serial.Serial() where serial_for_url() is not supported. + # serial_for_url() is a factory class and will return a different + # object based on the URL. For example serial_for_url("/dev/tty.") + # will return a serialposix.Serial object for Ubuntu or Mac OS; + # serial_for_url("COMx") will return a serialwin32.Serial oject for Windows OS. + # For that reason, we need to make sure the port doesn't get opened at + # this stage: We need to set its attributes up depending on what object + # we get. + try: + self.sp = serial.serial_for_url(aport, do_not_open=True, timeout=10) + except AttributeError: + self.sp = serial.Serial(port=None, timeout=10) + self.sp.port = aport + + if ((os.name == 'nt' and isinstance(self.sp, serial.serialwin32.Serial)) or \ + (os.name == 'posix' and isinstance(self.sp, serial.serialposix.Serial))): + self.sp.baudrate=abaudrate # baudrate + self.sp.bytesize=8 # number of databits + self.sp.parity=serial.PARITY_NONE # parity + self.sp.stopbits=1 # stop bits + self.sp.xonxoff=0 # s/w (XON/XOFF) flow control + self.sp.rtscts=0 # h/w (RTS/CTS) flow control + self.sp.timeout=0.5 # set the timeout value + + self.sp.open() + + def invoke_bootloader(self, dtr_active_high=False, inverted=False): # Use the DTR and RTS lines to control bootloader and the !RESET pin. # This can automatically invoke the bootloader without the user # having to toggle any pins. + # + # If inverted is False (default): # DTR: connected to the bootloader pin # RTS: connected to !RESET - self.sp.setDTR(1 if not dtr_active_high else 0) - self.sp.setRTS(0) - self.sp.setRTS(1) - self.sp.setRTS(0) - time.sleep(0.002) # Make sure the pin is still asserted when the cc2538 - # comes out of reset. This fixes an issue where there - # wasn't enough delay here on Mac. - self.sp.setDTR(0 if not dtr_active_high else 1) + # If inverted is True, pin connections are the other way round + if inverted: + set_bootloader_pin = self.sp.setRTS + set_reset_pin = self.sp.setDTR + else: + set_bootloader_pin = self.sp.setDTR + set_reset_pin = self.sp.setRTS + + set_bootloader_pin(1 if not dtr_active_high else 0) + set_reset_pin(0) + set_reset_pin(1) + set_reset_pin(0) + # Make sure the pin is still asserted when the chip + # comes out of reset. This fixes an issue where + # there wasn't enough delay here on Mac. + time.sleep(0.002) + set_bootloader_pin(0 if not dtr_active_high else 1) # Some boards have a co-processor that detects this sequence here and # then drives the main chip's BSL enable and !RESET pins. Depending on @@ -226,13 +250,12 @@ def invoke_bootloader(self, dtr_active_high=False): # it has actually entered its bootloader mode. # # See contiki-os/contiki#1533 - time.sleep(0.002) + time.sleep(0.1) def close(self): self.sp.close() - - def _wait_for_ack(self, info = "", timeout = 1): + def _wait_for_ack(self, info="", timeout=1): stop = time.time() + timeout got = bytearray(2) while got[-2] != 00 or got[-1] not in (CommandInterface.ACK_BYTE, @@ -266,41 +289,31 @@ def _encode_addr(self, addr): byte2 = (addr >> 8) & 0xFF byte1 = (addr >> 16) & 0xFF byte0 = (addr >> 24) & 0xFF - if PY3: - return bytes([byte0, byte1, byte2, byte3]) - else: - return (chr(byte0) + chr(byte1) + chr(byte2) + chr(byte3)) + return bytes([byte0, byte1, byte2, byte3]) def _decode_addr(self, byte0, byte1, byte2, byte3): return ((byte3 << 24) | (byte2 << 16) | (byte1 << 8) | (byte0 << 0)) def _calc_checks(self, cmd, addr, size): - return ((sum(bytearray(self._encode_addr(addr))) - +sum(bytearray(self._encode_addr(size))) - +cmd) - &0xFF) + return ((sum(bytearray(self._encode_addr(addr))) + + sum(bytearray(self._encode_addr(size))) + + cmd) & 0xFF) def _write(self, data, is_retry=False): - if PY3: - if type(data) == int: - assert data < 256 - goal = 1 - written = self.sp.write(bytes([data])) - elif type(data) == bytes or type(data) == bytearray: - goal = len(data) - written = self.sp.write(data) - else: - raise CmdException("Internal Error. Bad data type: {}".format(type(data))) + if type(data) == int: + assert data < 256 + goal = 1 + written = self.sp.write(bytes([data])) + elif type(data) == bytes or type(data) == bytearray: + goal = len(data) + written = self.sp.write(data) else: - if type(data) == int: - assert data < 256 - goal = 1 - written = self.sp.write(chr(data)) - else: - goal = len(data) - written = self.sp.write(data) + raise CmdException("Internal Error. Bad data type: {}" + .format(type(data))) + if written < goal: - mdebug(10, "*** Only wrote {} of target {} bytes".format(written, goal)) + mdebug(10, "*** Only wrote {} of target {} bytes" + .format(written, goal)) if is_retry and written == 0: raise CmdException("Failed to write data on the serial bus") mdebug(10, "*** Retrying write for remainder") @@ -322,7 +335,6 @@ def sendNAck(self): self._write(0x33) return - def receivePacket(self): # stop = time.time() + 5 # got = None @@ -334,34 +346,36 @@ def receivePacket(self): # if not got: # raise CmdException("No response to %s" % info) - size = got[0] #rcv size - chks = got[1] #rcv checksum - data = bytearray(self._read(size - 2)) # rcv data + size = got[0] # rcv size + chks = got[1] # rcv checksum + data = bytearray(self._read(size - 2)) # rcv data mdebug(10, "*** received %x bytes" % size) - if chks == sum(data)&0xFF: + if chks == sum(data) & 0xFF: self.sendAck() return data else: self.sendNAck() - #TODO: retry receiving! + # TODO: retry receiving! raise CmdException("Received packet checksum error") return 0 def sendSynch(self): cmd = 0x55 - self.sp.flushInput() #flush serial input buffer for first ACK reception + # flush serial input buffer for first ACK reception + self.sp.flushInput() mdebug(10, "*** sending synch sequence") - self._write(cmd) # send U - self._write(cmd) # send U + self._write(cmd) # send U + self._write(cmd) # send U return self._wait_for_ack("Synch (0x55 0x55)", 2) def checkLastCmd(self): stat = self.cmdGetStatus() if not (stat): - raise CmdException("No response from target on status request. (Did you disable the bootloader?)") + raise CmdException("No response from target on status request. " + "(Did you disable the bootloader?)") if stat[0] == COMMAND_RET_SUCCESS: mdebug(10, "Command Successful") @@ -369,19 +383,19 @@ def checkLastCmd(self): else: stat_str = RETURN_CMD_STRS.get(stat[0], None) if stat_str is None: - mdebug(0, 'Warning: unrecognized status returned 0x%x' % stat[0]) + mdebug(0, "Warning: unrecognized status returned " + "0x%x" % stat[0]) else: mdebug(0, "Target returned: 0x%x, %s" % (stat[0], stat_str)) return 0 - def cmdPing(self): cmd = 0x20 lng = 3 - self._write(lng) # send size - self._write(cmd) # send checksum - self._write(cmd) # send data + self._write(lng) # send size + self._write(cmd) # send checksum + self._write(cmd) # send data mdebug(10, "*** Ping command (0x20)") if self._wait_for_ack("Ping (0x20)"): @@ -391,9 +405,9 @@ def cmdReset(self): cmd = 0x25 lng = 3 - self._write(lng) # send size - self._write(cmd) # send checksum - self._write(cmd) # send data + self._write(lng) # send size + self._write(cmd) # send checksum + self._write(cmd) # send data mdebug(10, "*** Reset command (0x25)") if self._wait_for_ack("Reset (0x25)"): @@ -403,15 +417,17 @@ def cmdGetChipId(self): cmd = 0x28 lng = 3 - self._write(lng) # send size - self._write(cmd) # send checksum - self._write(cmd) # send data + self._write(lng) # send size + self._write(cmd) # send checksum + self._write(cmd) # send data mdebug(10, "*** GetChipId command (0x28)") if self._wait_for_ack("Get ChipID (0x28)"): - version = self.receivePacket() # 4 byte answ, the 2 LSB hold chip ID + # 4 byte answ, the 2 LSB hold chip ID + version = self.receivePacket() if self.checkLastCmd(): - assert len(version) == 4, "Unreasonable chip id: %s" % repr(version) + assert len(version) == 4, ("Unreasonable chip " + "id: %s" % repr(version)) mdebug(10, " Version 0x%02X%02X%02X%02X" % tuple(version)) chip_id = (version[2] << 8) | version[3] return chip_id @@ -422,9 +438,9 @@ def cmdGetStatus(self): cmd = 0x23 lng = 3 - self._write(lng) # send size - self._write(cmd) # send checksum - self._write(cmd) # send data + self._write(lng) # send size + self._write(cmd) # send checksum + self._write(cmd) # send data mdebug(10, "*** GetStatus command (0x23)") if self._wait_for_ack("Get Status (0x23)"): @@ -435,9 +451,9 @@ def cmdSetXOsc(self): cmd = 0x29 lng = 3 - self._write(lng) # send size - self._write(cmd) # send checksum - self._write(cmd) # send data + self._write(lng) # send size + self._write(cmd) # send checksum + self._write(cmd) # send data mdebug(10, "*** SetXOsc command (0x29)") if self._wait_for_ack("SetXOsc (0x29)"): @@ -445,133 +461,136 @@ def cmdSetXOsc(self): # UART speed (needs) to be changed! def cmdRun(self, addr): - cmd=0x22 - lng=7 + cmd = 0x22 + lng = 7 - self._write(lng) # send length - self._write(self._calc_checks(cmd,addr,0)) # send checksum - self._write(cmd) # send cmd - self._write(self._encode_addr(addr)) # send addr + self._write(lng) # send length + self._write(self._calc_checks(cmd, addr, 0)) # send checksum + self._write(cmd) # send cmd + self._write(self._encode_addr(addr)) # send addr mdebug(10, "*** Run command(0x22)") return 1 def cmdEraseMemory(self, addr, size): - cmd=0x26 - lng=11 + cmd = 0x26 + lng = 11 - self._write(lng) # send length - self._write(self._calc_checks(cmd,addr,size)) # send checksum - self._write(cmd) # send cmd - self._write(self._encode_addr(addr)) # send addr - self._write(self._encode_addr(size)) # send size + self._write(lng) # send length + self._write(self._calc_checks(cmd, addr, size)) # send checksum + self._write(cmd) # send cmd + self._write(self._encode_addr(addr)) # send addr + self._write(self._encode_addr(size)) # send size mdebug(10, "*** Erase command(0x26)") - if self._wait_for_ack("Erase memory (0x26)",10): + if self._wait_for_ack("Erase memory (0x26)", 10): return self.checkLastCmd() def cmdBankErase(self): cmd = 0x2C lng = 3 - self._write(lng) # send length - self._write(cmd) # send checksum - self._write(cmd) # send cmd + self._write(lng) # send length + self._write(cmd) # send checksum + self._write(cmd) # send cmd mdebug(10, "*** Bank Erase command(0x2C)") - if self._wait_for_ack("Bank Erase (0x2C)",10): + if self._wait_for_ack("Bank Erase (0x2C)", 10): return self.checkLastCmd() def cmdCRC32(self, addr, size): - cmd=0x27 - lng=11 + cmd = 0x27 + lng = 11 - self._write(lng) # send length - self._write(self._calc_checks(cmd,addr,size)) # send checksum - self._write(cmd) # send cmd - self._write(self._encode_addr(addr)) # send addr - self._write(self._encode_addr(size)) # send size + self._write(lng) # send length + self._write(self._calc_checks(cmd, addr, size)) # send checksum + self._write(cmd) # send cmd + self._write(self._encode_addr(addr)) # send addr + self._write(self._encode_addr(size)) # send size mdebug(10, "*** CRC32 command(0x27)") - if self._wait_for_ack("Get CRC32 (0x27)",1): - crc=self.receivePacket() + if self._wait_for_ack("Get CRC32 (0x27)", 1): + crc = self.receivePacket() if self.checkLastCmd(): - return self._decode_addr(crc[3],crc[2],crc[1],crc[0]) + return self._decode_addr(crc[3], crc[2], crc[1], crc[0]) def cmdCRC32CC26xx(self, addr, size): cmd = 0x27 lng = 15 - self._write(lng) # send length - self._write(self._calc_checks(cmd, addr, size)) # send checksum - self._write(cmd) # send cmd - self._write(self._encode_addr(addr)) # send addr - self._write(self._encode_addr(size)) # send size - self._write(self._encode_addr(0x00000000)) # send number of reads + self._write(lng) # send length + self._write(self._calc_checks(cmd, addr, size)) # send checksum + self._write(cmd) # send cmd + self._write(self._encode_addr(addr)) # send addr + self._write(self._encode_addr(size)) # send size + self._write(self._encode_addr(0x00000000)) # send number of reads mdebug(10, "*** CRC32 command(0x27)") if self._wait_for_ack("Get CRC32 (0x27)", 1): - crc=self.receivePacket() + crc = self.receivePacket() if self.checkLastCmd(): return self._decode_addr(crc[3], crc[2], crc[1], crc[0]) def cmdDownload(self, addr, size): - cmd=0x21 - lng=11 + cmd = 0x21 + lng = 11 - if (size % 4) != 0: # check for invalid data lengths - raise Exception('Invalid data size: %i. Size must be a multiple of 4.' % size) + if (size % 4) != 0: # check for invalid data lengths + raise Exception('Invalid data size: %i. ' + 'Size must be a multiple of 4.' % size) - self._write(lng) # send length - self._write(self._calc_checks(cmd,addr,size)) # send checksum - self._write(cmd) # send cmd - self._write(self._encode_addr(addr)) # send addr - self._write(self._encode_addr(size)) # send size + self._write(lng) # send length + self._write(self._calc_checks(cmd, addr, size)) # send checksum + self._write(cmd) # send cmd + self._write(self._encode_addr(addr)) # send addr + self._write(self._encode_addr(size)) # send size mdebug(10, "*** Download command (0x21)") - if self._wait_for_ack("Download (0x21)",2): + if self._wait_for_ack("Download (0x21)", 2): return self.checkLastCmd() def cmdSendData(self, data): - cmd=0x24 - lng=len(data)+3 + cmd = 0x24 + lng = len(data)+3 # TODO: check total size of data!! max 252 bytes! - self._write(lng) # send size - self._write((sum(bytearray(data))+cmd)&0xFF) # send checksum - self._write(cmd) # send cmd - self._write(bytearray(data)) # send data + self._write(lng) # send size + self._write((sum(bytearray(data))+cmd) & 0xFF) # send checksum + self._write(cmd) # send cmd + self._write(bytearray(data)) # send data mdebug(10, "*** Send Data (0x24)") - if self._wait_for_ack("Send data (0x24)",10): + if self._wait_for_ack("Send data (0x24)", 10): return self.checkLastCmd() - def cmdMemRead(self, addr): # untested - cmd=0x2A - lng=8 + def cmdMemRead(self, addr): # untested + cmd = 0x2A + lng = 8 - self._write(lng) # send length - self._write(self._calc_checks(cmd,addr,4)) # send checksum - self._write(cmd) # send cmd - self._write(self._encode_addr(addr)) # send addr - self._write(4) # send width, 4 bytes + self._write(lng) # send length + self._write(self._calc_checks(cmd, addr, 4)) # send checksum + self._write(cmd) # send cmd + self._write(self._encode_addr(addr)) # send addr + self._write(4) # send width, 4 bytes mdebug(10, "*** Mem Read (0x2A)") - if self._wait_for_ack("Mem Read (0x2A)",1): + if self._wait_for_ack("Mem Read (0x2A)", 1): data = self.receivePacket() if self.checkLastCmd(): - return data # self._decode_addr(ord(data[3]),ord(data[2]),ord(data[1]),ord(data[0])) + # self._decode_addr(ord(data[3]), + # ord(data[2]),ord(data[1]),ord(data[0])) + return data def cmdMemReadCC26xx(self, addr): cmd = 0x2A lng = 9 - self._write(lng) # send length - self._write(self._calc_checks(cmd, addr, 2)) # send checksum - self._write(cmd) # send cmd - self._write(self._encode_addr(addr)) # send addr - self._write(1) # send width, 4 bytes - self._write(1) # send number of reads + self._write(lng) # send length + self._write(self._calc_checks(cmd, addr, 2)) # send checksum + self._write(cmd) # send cmd + self._write(self._encode_addr(addr)) # send addr + self._write(1) # send width, 4 bytes + self._write(1) # send number of reads mdebug(10, "*** Mem Read (0x2A)") if self._wait_for_ack("Mem Read (0x2A)", 1): @@ -579,55 +598,74 @@ def cmdMemReadCC26xx(self, addr): if self.checkLastCmd(): return data - def cmdMemWrite(self, addr, data, width): # untested - # TODO: check width for 1 or 4 and data size - cmd=0x2B - lng=10 + def cmdMemWrite(self, addr, data, width): + if width != len(data): + raise ValueError("width does not match len(data)") + if width != 1 and width != 4: + raise ValueError("width must be 1 or 4") - self._write(lng) # send length - self._write(self._calc_checks(cmd,addr,0)) # send checksum - self._write(cmd) # send cmd - self._write(self._encode_addr(addr)) # send addr - self._write(bytearray(data)) # send data - self._write(width) # send width, 4 bytes + cmd = 0x2B + lng = 8 + len(data) + + content = ( + bytearray([cmd]) + + self._encode_addr(addr) + + bytearray([1 if (width == 4) else 0]) + + bytearray(data) + ) + + self._write(lng) # send length + self._write(sum(content) & 0xFF) # send checksum + self._write(content) mdebug(10, "*** Mem write (0x2B)") - if self._wait_for_ack("Mem Write (0x2B)",2): - return checkLastCmd() + if self._wait_for_ack("Mem Write (0x2B)", 2): + return self.checkLastCmd() # Complex commands section def writeMemory(self, addr, data): lng = len(data) - trsf_size = 248 # amount of data bytes transferred per packet (theory: max 252 + 3) + # amount of data bytes transferred per packet (theory: max 252 + 3) + trsf_size = 248 empty_packet = bytearray((0xFF,) * trsf_size) # Boot loader enable check - # TODO: implement check for all chip sizes & take into account partial firmware uploads - if (lng == 524288): #check if file is for 512K model - if not ((data[524247] & (1 << 4)) >> 4): #check the boot loader enable bit (only for 512K model) - if not ( conf['force'] or query_yes_no("The boot loader backdoor is not enabled "\ - "in the firmware you are about to write to the target. "\ - "You will NOT be able to reprogram the target using this tool if you continue! "\ - "Do you want to continue?","no") ): + # TODO: implement check for all chip sizes & take into account partial + # firmware uploads + if (lng == 524288): # check if file is for 512K model + # check the boot loader enable bit (only for 512K model) + if not ((data[524247] & (1 << 4)) >> 4): + if not (conf['force'] or + query_yes_no("The boot loader backdoor is not enabled " + "in the firmware you are about to write " + "to the target. You will NOT be able to " + "reprogram the target using this tool if " + "you continue! " + "Do you want to continue?", "no")): raise Exception('Aborted by user.') mdebug(5, "Writing %(lng)d bytes starting at address 0x%(addr)08X" % - { 'lng': lng, 'addr': addr}) + {'lng': lng, 'addr': addr}) offs = 0 addr_set = 0 - while lng > trsf_size: #check if amount of remaining data is less then packet size - if data[offs:offs+trsf_size] != empty_packet: #skip packets filled with 0xFF + # check if amount of remaining data is less then packet size + while lng > trsf_size: + # skip packets filled with 0xFF + if data[offs:offs+trsf_size] != empty_packet: if addr_set != 1: - self.cmdDownload(addr,lng) #set starting address if not set + # set starting address if not set + self.cmdDownload(addr, lng) addr_set = 1 - mdebug(5, " Write %(len)d bytes at 0x%(addr)08X" % {'addr': addr, 'len': trsf_size}, '\r') + mdebug(5, " Write %(len)d bytes at 0x%(addr)08X" + % {'addr': addr, 'len': trsf_size}, '\r') sys.stdout.flush() - self.cmdSendData(data[offs:offs+trsf_size]) # send next data packet + # send next data packet + self.cmdSendData(data[offs:offs+trsf_size]) else: # skipped packet, address needs to be set addr_set = 0 @@ -635,9 +673,11 @@ def writeMemory(self, addr, data): addr = addr + trsf_size lng = lng - trsf_size - mdebug(5, "Write %(len)d bytes at 0x%(addr)08X" % {'addr': addr, 'len': lng}) - self.cmdDownload(addr,lng) - return self.cmdSendData(data[offs:offs+lng]) # send last data packet + mdebug(5, "Write %(len)d bytes at 0x%(addr)08X" % {'addr': addr, + 'len': lng}) + self.cmdDownload(addr, lng) + return self.cmdSendData(data[offs:offs+lng]) # send last data packet + class Chip(object): def __init__(self, command_interface): @@ -646,26 +686,34 @@ def __init__(self, command_interface): # Some defaults. The child can override. self.flash_start_addr = 0x00000000 self.has_cmd_set_xosc = False + self.page_size = 2048 + + def page_to_addr(self, pages): + addresses = [] + for page in pages: + addresses.append(int(device.flash_start_addr) + + int(page)*self.page_size) + return addresses def crc(self, address, size): return getattr(self.command_interface, self.crc_cmd)(address, size) def disable_bootloader(self): - if not (conf['force'] or query_yes_no("Disabling the bootloader will prevent you from "\ - "using this script until you re-enable the bootloader "\ - "using JTAG. Do you want to continue?", "no")): + if not (conf['force'] or + query_yes_no("Disabling the bootloader will prevent you from " + "using this script until you re-enable the " + "bootloader using JTAG. Do you want to continue?", + "no")): raise Exception('Aborted by user.') - if PY3: - pattern = struct.pack('> 4 + if 0 < self.size <= 4: + self.size *= 0x20000 # in bytes + else: + self.size = 0x10000 # in bytes self.bootloader_address = self.flash_start_addr + self.size - ccfg_len - sram = 16 + ((((model[2] << 8) | model[3]) & 0x380) >> 5) # in KB + sram = (((model[2] << 8) | model[3]) & 0x380) >> 7 + sram = (2 - sram) << 3 if sram <= 1 else 32 # in KB pg = self.command_interface.cmdMemRead(FLASH_CTRL_DIECFG2) pg_major = (pg[2] & 0xF0) >> 4 + if pg_major == 0: + pg_major = 1 pg_minor = pg[2] & 0x0F - ieee_addr = self.command_interface.cmdMemRead(addr_ieee_address_primary + 4) - ieee_addr += self.command_interface.cmdMemRead(addr_ieee_address_primary) + ti_oui = bytearray([0x00, 0x12, 0x4B]) + ieee_addr = self.command_interface.cmdMemRead( + addr_ieee_address_primary) + ieee_addr_end = self.command_interface.cmdMemRead( + addr_ieee_address_primary + 4) + if ieee_addr[:3] == ti_oui: + ieee_addr += ieee_addr_end + else: + ieee_addr = ieee_addr_end + ieee_addr mdebug(5, "CC2538 PG%d.%d: %dKB Flash, %dKB SRAM, CCFG at 0x%08X" % (pg_major, pg_minor, self.size >> 10, sram, self.bootloader_address)) - mdebug(5, "Primary IEEE Address: %s" % (':'.join('%02X' % x for x in ieee_addr))) + mdebug(5, "Primary IEEE Address: %s" + % (':'.join('%02X' % x for x in ieee_addr))) def erase(self): - mdebug(5, "Erasing %s bytes starting at address 0x%08X" % (self.size, self.flash_start_addr)) - return self.command_interface.cmdEraseMemory(self.flash_start_addr, self.size) + mdebug(5, "Erasing %s bytes starting at address 0x%08X" + % (self.size, self.flash_start_addr)) + return self.command_interface.cmdEraseMemory(self.flash_start_addr, + self.size) def read_memory(self, addr): # CC2538's COMMAND_MEMORY_READ sends each 4-byte number in inverted @@ -709,6 +774,7 @@ def read_memory(self, addr): data = self.command_interface.cmdMemRead(addr) return bytearray([data[x] for x in range(3, -1, -1)]) + class CC26xx(Chip): # Class constants MISC_CONF_1 = 0x500010A0 @@ -720,6 +786,7 @@ def __init__(self, command_interface): super(CC26xx, self).__init__(command_interface) self.bootloader_dis_val = 0x00000000 self.crc_cmd = "cmdCRC32CC26xx" + self.page_size = 4096 ICEPICK_DEVICE_ID = 0x50001318 FCFG_USER_ID = 0x50001294 @@ -741,7 +808,14 @@ def __init__(self, command_interface): # Read FCFG1_USER_ID to get the package and supported protocols user_id = self.command_interface.cmdMemReadCC26xx(FCFG_USER_ID) - package = {0x00: '4x4mm', 0x01: '5x5mm', 0x02: '7x7mm'}.get(user_id[2] & 0x03, "Unknown") + package = {0x00: '4x4mm', + 0x01: '5x5mm', + 0x02: '7x7mm', + 0x03: 'Wafer', + 0x04: '2.7x2.7', + 0x05: '7x7mm Q1', + }.get(user_id[2] & 0x03, "Unknown") + protocols = user_id[1] >> 4 # We can now detect the exact device @@ -749,14 +823,20 @@ def __init__(self, command_interface): chip = self._identify_cc26xx(pg_rev, protocols) elif wafer_id == 0xB9BE: chip = self._identify_cc13xx(pg_rev, protocols) + elif wafer_id == 0xBB41: + chip = self._identify_cc13xx(pg_rev, protocols) + self.page_size = 8192 # Read flash size, calculate and store bootloader disable address - self.size = self.command_interface.cmdMemReadCC26xx(FLASH_SIZE)[0] * 4096 + self.size = self.command_interface.cmdMemReadCC26xx( + FLASH_SIZE)[0] * self.page_size self.bootloader_address = self.size - ccfg_len + bootloader_dis_offset - self.addr_ieee_address_secondary = self.size - ccfg_len + ieee_address_secondary_offset + self.addr_ieee_address_secondary = (self.size - ccfg_len + + ieee_address_secondary_offset) # RAM size - ramhwopt_size = self.command_interface.cmdMemReadCC26xx(PRCM_RAMHWOPT)[0] & 3 + ramhwopt_size = self.command_interface.cmdMemReadCC26xx( + PRCM_RAMHWOPT)[0] & 3 if ramhwopt_size == 3: sram = "20KB" elif ramhwopt_size == 2: @@ -765,13 +845,16 @@ def __init__(self, command_interface): sram = "Unknown" # Primary IEEE address. Stored with the MSB at the high address - ieee_addr = self.command_interface.cmdMemReadCC26xx(addr_ieee_address_primary + 4)[::-1] - ieee_addr += self.command_interface.cmdMemReadCC26xx(addr_ieee_address_primary)[::-1] + ieee_addr = self.command_interface.cmdMemReadCC26xx( + addr_ieee_address_primary + 4)[::-1] + ieee_addr += self.command_interface.cmdMemReadCC26xx( + addr_ieee_address_primary)[::-1] mdebug(5, "%s (%s): %dKB Flash, %s SRAM, CCFG.BL_CONFIG at 0x%08X" % (chip, package, self.size >> 10, sram, self.bootloader_address)) - mdebug(5, "Primary IEEE Address: %s" % (':'.join('%02X' % x for x in ieee_addr))) + mdebug(5, "Primary IEEE Address: %s" + % (':'.join('%02X' % x for x in ieee_addr))) def _identify_cc26xx(self, pg, protocols): chips_dict = { @@ -788,11 +871,20 @@ def _identify_cc26xx(self, pg, protocols): pg_str = "PG2.0" elif pg == 7: pg_str = "PG2.1" - elif pg == 8: - rev_minor = self.command_interface.cmdMemReadCC26xx(CC26xx.MISC_CONF_1)[0] + elif pg == 8 or pg == 0x0B: + # CC26x0 PG2.2+ or CC26x0R2 + rev_minor = self.command_interface.cmdMemReadCC26xx( + CC26xx.MISC_CONF_1)[0] if rev_minor == 0xFF: rev_minor = 0x00 - pg_str = "PG2.%d" % (2 + rev_minor,) + + if pg == 8: + # CC26x0 + pg_str = "PG2.%d" % (2 + rev_minor,) + elif pg == 0x0B: + # HW revision R2, update Chip name + chip_str += 'R2' + pg_str = "PG%d.%d" % (1 + (rev_minor // 10), rev_minor % 10) return "%s %s" % (chip_str, pg_str) @@ -803,8 +895,9 @@ def _identify_cc13xx(self, pg, protocols): if pg == 0: pg_str = "PG1.0" - elif pg == 2: - rev_minor = self.command_interface.cmdMemReadCC26xx(CC26xx.MISC_CONF_1)[0] + elif pg == 2 or pg == 3: + rev_minor = self.command_interface.cmdMemReadCC26xx( + CC26xx.MISC_CONF_1)[0] if rev_minor == 0xFF: rev_minor = 0x00 pg_str = "PG2.%d" % (rev_minor,) @@ -820,10 +913,14 @@ def read_memory(self, addr): # they are stored on the device return self.command_interface.cmdMemReadCC26xx(addr) + def query_yes_no(question, default="yes"): - valid = {"yes":True, "y":True, "ye":True, - "no":False, "n":False} - if default == None: + valid = {"yes": True, + "y": True, + "ye": True, + "no": False, + "n": False} + if default is None: prompt = " [y/n] " elif default == "yes": prompt = " [Y/n] " @@ -834,20 +931,18 @@ def query_yes_no(question, default="yes"): while True: sys.stdout.write(question + prompt) - if PY3: - choice = input().lower() - else: - choice = raw_input().lower() + choice = input().lower() if default is not None and choice == '': return valid[default] elif choice in valid: return valid[choice] else: - sys.stdout.write("Please respond with 'yes' or 'no' "\ + sys.stdout.write("Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n") + # Convert the entered IEEE address into an integer -def parse_ieee_address (inaddr): +def parse_ieee_address(inaddr): try: return int(inaddr, 16) except ValueError: @@ -859,13 +954,57 @@ def parse_ieee_address (inaddr): if len(bytes) != 8: raise ValueError("Supplied IEEE address does not contain 8 bytes") addr = 0 - for i,b in zip(range(8), bytes): + for i, b in zip(range(8), bytes): try: addr += int(b, 16) << (56-(i*8)) except ValueError: raise ValueError("IEEE address contains invalid bytes") return addr + +def _parse_range_values(device, values): + if len(values) and len(values) < 3: + page_addr_range = [] + try: + for value in values: + try: + if int(value) % int(device.page_size) is not 0: + raise ValueError("Supplied addresses are not page_size: " + "{} aligned".format(device.page_size)) + page_addr_range.append(int(value)) + except ValueError: + if int(value, 16) % int(device.page_size) is not 0: + raise ValueError("Supplied addresses are not page_size: " + "{} aligned".format(device.page_size)) + page_addr_range.append(int(value, 16)) + return page_addr_range + except ValueError: + raise ValueError("Supplied value is not a page or an address") + else: + raise ValueError("Supplied range is neither a page or address range") + + +def parse_page_address_range(device, pg_range): + """Convert the address/page range into a start address and byte length""" + values = pg_range.split(',') + page_addr = [] + # check if first argument is character + if values[0].isalpha(): + values[0].lower() + if values[0] == 'p' or values[0] == 'page': + if values[0] == 'p': + values[1:] = device.page_to_addr(values[1:]) + elif values[0] != 'a' and values[0] != 'address': + raise ValueError("Prefix is neither a(address) or p(page)") + page_addr.extend(_parse_range_values(device, values[1:])) + else: + page_addr.extend(_parse_range_values(device, values)) + if len(page_addr) == 1: + return [page_addr[0], device.page_size] + else: + return [page_addr[0], (page_addr[1] - page_addr[0])] + + def print_version(): # Get the version using "git describe". try: @@ -873,63 +1012,77 @@ def print_version(): stdout=PIPE, stderr=PIPE) p.stderr.close() line = p.stdout.readlines()[0] - version = line.strip() + version = line.decode('utf-8').strip() except: - # We're not in a git repo, or git failed, use fixed version string. - version = VERSION_STRING + # We're not in a git repo, or git failed, use fixed version string. + version = __version__ print('%s %s' % (sys.argv[0], version)) + def usage(): - print("""Usage: %s [-DhqVfewvr] [-l length] [-p port] [-b baud] [-a addr] [-i addr] [--bootloader-active-high] [file.bin] - -h, --help This help - -q Quiet - -V Verbose - -f Force operation(s) without asking any questions - -e Erase (full) - -w Write - -v Verify (CRC32 check) - -r Read - -l length Length of read - -p port Serial port (default: first USB-like port in /dev) - -b baud Baud speed (default: 500000) - -a addr Target address - -i, --ieee-address addr Set the secondary 64 bit IEEE address - --bootloader-active-high Use active high signals to enter bootloader - -D, --disable-bootloader After finishing, disable the bootloader - --version Print script version + print("""Usage: %s [-DhqVfewvr] [-l length] [-p port] [-b baud] [-a addr] \ + [-i addr] [--bootloader-active-high] [--bootloader-invert-lines] [file.bin] + -h, --help This help + -q Quiet + -V Verbose + -f Force operation(s) without asking any questions + -e Mass erase + -E, --erase-page p/a,range Receives an address(a) range or page(p) range, + default is address(a) + eg: -E a,0x00000000,0x00001000, + -E p,1,4 + -w Write + -v Verify (CRC32 check) + -r Read + -l length Length of read + -p port Serial port (default: first USB-like port in /dev) + -b baud Baud speed (default: 500000) + -a addr Target address + -i, --ieee-address addr Set the secondary 64 bit IEEE address + --bootloader-active-high Use active high signals to enter bootloader + --bootloader-invert-lines Inverts the use of RTS and DTR to enter bootloader + -D, --disable-bootloader After finishing, disable the bootloader + --version Print script version Examples: ./%s -e -w -v example/main.bin ./%s -e -w -v --ieee-address 00:12:4b:aa:bb:cc:dd:ee example/main.bin - """ % (sys.argv[0],sys.argv[0],sys.argv[0])) + """ % (sys.argv[0], sys.argv[0], sys.argv[0])) if __name__ == "__main__": conf = { 'port': 'auto', 'baud': 500000, - 'force_speed' : 0, + 'force_speed': 0, 'address': None, 'force': 0, 'erase': 0, 'write': 0, + 'erase_page': 0, 'verify': 0, 'read': 0, 'len': 0x80000, - 'fname':'', + 'fname': '', 'ieee_address': 0, 'bootloader_active_high': False, + 'bootloader_invert_lines': False, 'disable-bootloader': 0 } # http://www.python.org/doc/2.5.2/lib/module-getopt.html try: - opts, args = getopt.getopt(sys.argv[1:], "DhqVfewvrp:b:a:l:i:", ['help', 'ieee-address=', 'disable-bootloader', 'bootloader-active-high', 'version']) + opts, args = getopt.getopt(sys.argv[1:], + "DhqVfeE:wvrp:b:a:l:i:", + ['help', 'ieee-address=','erase-page=', + 'disable-bootloader', + 'bootloader-active-high', + 'bootloader-invert-lines', 'version']) except getopt.GetoptError as err: # print help information and exit: - print(str(err)) # will print something like "option -a not recognized" + print(str(err)) # will print something like "option -a not recognized" usage() sys.exit(2) @@ -947,6 +1100,8 @@ def usage(): conf['erase'] = 1 elif o == '-w': conf['write'] = 1 + elif o == '-E' or o == '--erase-page': + conf['erase_page'] = str(a) elif o == '-v': conf['verify'] = 1 elif o == '-r': @@ -964,6 +1119,8 @@ def usage(): conf['ieee_address'] = str(a) elif o == '--bootloader-active-high': conf['bootloader_active_high'] = True + elif o == '--bootloader-invert-lines': + conf['bootloader_invert_lines'] = True elif o == '-D' or o == '--disable-bootloader': conf['disable-bootloader'] = 1 elif o == '--version': @@ -974,19 +1131,23 @@ def usage(): try: # Sanity checks - if conf['write'] or conf['read'] or conf['verify']: # check for input/output file + # check for input/output file + if conf['write'] or conf['read'] or conf['verify']: try: args[0] except: raise Exception('No file path given.') if conf['write'] and conf['read']: - if not ( conf['force'] or query_yes_no("You are reading and writing to the same file. This will overwrite your input file. "\ - "Do you want to continue?","no") ): + if not (conf['force'] or + query_yes_no("You are reading and writing to the same " + "file. This will overwrite your input file. " + "Do you want to continue?", "no")): raise Exception('Aborted by user.') - if conf['erase'] and conf['read'] and not conf['write']: - if not ( conf['force'] or query_yes_no("You are about to erase your target before reading. "\ - "Do you want to continue?","no") ): + if (conf['erase'] and conf['read']) or (conf['erase_page'] and conf['read']) and not conf['write']: + if not (conf['force'] or + query_yes_no("You are about to erase your target before " + "reading. Do you want to continue?", "no")): raise Exception('Aborted by user.') if conf['read'] and not conf['write'] and conf['verify']: @@ -1001,25 +1162,27 @@ def usage(): ports = [] # Get a list of all USB-like names in /dev - for name in ['tty.usbserial', 'ttyUSB', 'tty.usbmodem', 'tty.SLAB_USBtoUART']: + for name in ['ttyACM', + 'tty.usbserial', + 'ttyUSB', + 'tty.usbmodem', + 'tty.SLAB_USBtoUART']: ports.extend(glob.glob('/dev/%s*' % name)) ports = sorted(ports) if ports: - port = ports[-1] - if len(ports) > 1: - sys.stderr.write("More than one port detected, using the last one, " - "%s\n" % (port,)) - conf['port'] = port + # Found something - take it + conf['port'] = ports[0] else: raise Exception('No serial port found.') cmd = CommandInterface() cmd.open(conf['port'], conf['baud']) - cmd.invoke_bootloader(conf['bootloader_active_high']) - mdebug(5, "Opening port %(port)s, baud %(baud)d" % {'port':conf['port'], - 'baud':conf['baud']}) + cmd.invoke_bootloader(conf['bootloader_active_high'], + conf['bootloader_invert_lines']) + mdebug(5, "Opening port %(port)s, baud %(baud)d" + % {'port': conf['port'], 'baud': conf['baud']}) if conf['write'] or conf['verify']: mdebug(5, "Reading data from %s" % args[0]) firmware = FirmwareFile(args[0]) @@ -1027,10 +1190,12 @@ def usage(): mdebug(5, "Connecting to target...") if not cmd.sendSynch(): - raise CmdException("Can't connect to target. Ensure boot loader is started. (no answer on synch sequence)") + raise CmdException("Can't connect to target. Ensure boot loader " + "is started. (no answer on synch sequence)") # if (cmd.cmdPing() != 1): - # raise CmdException("Can't connect to target. Ensure boot loader is started. (no answer on ping command)") + # raise CmdException("Can't connect to target. Ensure boot loader " + # "is started. (no answer on ping command)") chip_id = cmd.cmdGetChipId() chip_id_str = CHIP_ID_STRS.get(chip_id, None) @@ -1047,54 +1212,68 @@ def usage(): conf['address'] = device.flash_start_addr if conf['force_speed'] != 1 and device.has_cmd_set_xosc: - if cmd.cmdSetXOsc(): #switch to external clock source + if cmd.cmdSetXOsc(): # switch to external clock source cmd.close() conf['baud'] = 1000000 cmd.open(conf['port'], conf['baud']) - mdebug(6, "Opening port %(port)s, baud %(baud)d" % {'port':conf['port'], 'baud':conf['baud']}) + mdebug(6, "Opening port %(port)s, baud %(baud)d" + % {'port': conf['port'], 'baud': conf['baud']}) mdebug(6, "Reconnecting to target at higher speed...") if (cmd.sendSynch() != 1): - raise CmdException("Can't connect to target after clock source switch. (Check external crystal)") + raise CmdException("Can't connect to target after clock " + "source switch. (Check external " + "crystal)") else: - raise CmdException("Can't switch target to external clock source. (Try forcing speed)") + raise CmdException("Can't switch target to external clock " + "source. (Try forcing speed)") if conf['erase']: - # we only do full erase for now + mdebug(5, " Performing mass erase") if device.erase(): mdebug(5, " Erase done") else: raise CmdException("Erase failed") + if conf['erase_page']: + erase_range = parse_page_address_range(device, conf['erase_page']) + mdebug(5, "Erasing %d bytes at addres 0x%x" + % (erase_range[1], erase_range[0])) + cmd.cmdEraseMemory(erase_range[0], erase_range[1]) + mdebug(5, " Partial erase done ") + if conf['write']: - # TODO: check if boot loader back-door is open, need to read flash size first to get address + # TODO: check if boot loader back-door is open, need to read + # flash size first to get address if cmd.writeMemory(conf['address'], firmware.bytes): mdebug(5, " Write done ") else: raise CmdException("Write failed ") if conf['verify']: - mdebug(5,"Verifying by comparing CRC32 calculations.") + mdebug(5, "Verifying by comparing CRC32 calculations.") crc_local = firmware.crc32() - crc_target = device.crc(conf['address'], len(firmware.bytes)) #CRC of target will change according to length input file + # CRC of target will change according to length input file + crc_target = device.crc(conf['address'], len(firmware.bytes)) if crc_local == crc_target: mdebug(5, " Verified (match: 0x%08x)" % crc_local) else: cmd.cmdReset() - raise Exception("NO CRC32 match: Local = 0x%x, Target = 0x%x" % (crc_local,crc_target)) + raise Exception("NO CRC32 match: Local = 0x%x, " + "Target = 0x%x" % (crc_local, crc_target)) if conf['ieee_address'] != 0: ieee_addr = parse_ieee_address(conf['ieee_address']) - if PY3: - mdebug(5, "Setting IEEE address to %s" % (':'.join(['%02x' % b for b in struct.pack('>Q', ieee_addr)]))) - ieee_addr_bytes = struct.pack('Q', ieee_addr)]))) - ieee_addr_bytes = [ord(b) for b in struct.pack('Q', ieee_addr)]))) + ieee_addr_bytes = struct.pack('> 2): - rdata = device.read_memory(conf['address'] + (i * 4)) #reading 4 bytes at a time - mdebug(5, " 0x%x: 0x%02x%02x%02x%02x" % (conf['address'] + (i * 4), rdata[0], rdata[1], rdata[2], rdata[3]), '\r') + # reading 4 bytes at a time + rdata = device.read_memory(conf['address'] + (i * 4)) + mdebug(5, " 0x%x: 0x%02x%02x%02x%02x" + % (conf['address'] + (i * 4), rdata[0], rdata[1], + rdata[2], rdata[3]), '\r') f.write(rdata) f.close() mdebug(5, " Read done ") diff --git a/nesc/whip6/platforms/tools/programmer/ccbsl/intelhex/__init__.py b/nesc/whip6/platforms/tools/programmer/ccbsl/intelhex/__init__.py index d4459c7..c6423a6 100644 --- a/nesc/whip6/platforms/tools/programmer/ccbsl/intelhex/__init__.py +++ b/nesc/whip6/platforms/tools/programmer/ccbsl/intelhex/__init__.py @@ -1,4 +1,4 @@ -# Copyright (c) 2005-2012, Alexander Belchenko +# Copyright (c) 2005-2018, Alexander Belchenko # All rights reserved. # # Redistribution and use in source and binary forms, @@ -31,23 +31,37 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, # EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -'''Intel HEX file format reader and converter. - -@author Alexander Belchenko (bialix AT ukr net) -@version 1.4 -''' - +'''Intel HEX format manipulation library.''' __docformat__ = "javadoc" - from array import array from binascii import hexlify, unhexlify from bisect import bisect_right import os import sys -from compat import asbytes, asstr +from intelhex.compat import ( + IntTypes, + StrType, + StringIO, + array_tobytes, + asbytes, + asstr, + dict_items_g, + dict_keys, + dict_keys_g, + range_g, + range_l, + ) + +from intelhex.getsizeof import total_size + + +class _DeprecatedParam(object): + pass + +_DEPRECATED = _DeprecatedParam() class IntelHex(object): @@ -61,7 +75,7 @@ def __init__(self, source=None): (file name of HEX file, file object, addr dict or other IntelHex object) ''' - #public members + # public members self.padding = 0x0FF # Start Address self.start_addr = None @@ -71,7 +85,7 @@ def __init__(self, source=None): self._offset = 0 if source is not None: - if isinstance(source, basestring) or getattr(source, "read", None): + if isinstance(source, StrType) or getattr(source, "read", None): # load hex file self.loadhex(source) elif isinstance(source, dict): @@ -126,7 +140,7 @@ def _decode_record(self, s, line=0): if record_type == 0: # data record addr += self._offset - for i in xrange(4, 4+record_length): + for i in range_g(4, 4+record_length): if not self._buf.get(addr, None) is None: raise AddressOverlapError(address=addr, line=line) self._buf[addr] = bin[i] @@ -212,7 +226,7 @@ def loadbin(self, fobj, offset=0): """ fread = getattr(fobj, "read", None) if fread is None: - f = file(fobj, "rb") + f = open(fobj, "rb") fread = f.read fclose = f.close else: @@ -257,8 +271,8 @@ def fromdict(self, dikt): start_addr = s.get('start_addr') if start_addr is not None: del s['start_addr'] - for k in s.keys(): - if type(k) not in (int, long) or k < 0: + for k in dict_keys_g(s): + if type(k) not in IntTypes or k < 0: raise ValueError('Source dictionary should have only int keys') self._buf.update(s) if start_addr is not None: @@ -273,14 +287,18 @@ def frombytes(self, bytes, offset=0): offset += 1 def _get_start_end(self, start=None, end=None, size=None): - """Return default values for start and end if they are None + """Return default values for start and end if they are None. + If this IntelHex object is empty then it's error to + invoke this method with both start and end as None. """ + if (start,end) == (None,None) and self._buf == {}: + raise EmptyIntelHexError if size is not None: if None not in (start, end): raise ValueError("tobinarray: you can't use start,end and size" " arguments in the same time") if (start, end) == (None, None): - start = min(self._buf.keys()) + start = self.minaddr() if start is not None: end = start + size - 1 else: @@ -290,69 +308,100 @@ def _get_start_end(self, start=None, end=None, size=None): "for given end address (%d)" % (size,end)) else: if start is None: - start = min(self._buf.keys()) + start = self.minaddr() if end is None: - end = max(self._buf.keys()) + end = self.maxaddr() if start > end: start, end = end, start return start, end - def tobinarray(self, start=None, end=None, pad=None, size=None): + def tobinarray(self, start=None, end=None, pad=_DEPRECATED, size=None): ''' Convert this object to binary form as array. If start and end unspecified, they will be inferred from the data. @param start start address of output bytes. @param end end address of output bytes (inclusive). - @param pad fill empty spaces with this value - (if None used self.padding). + @param pad [DEPRECATED PARAMETER, please use self.padding instead] + fill empty spaces with this value + (if pad is None then this method uses self.padding). @param size size of the block, used with start or end parameter. @return array of unsigned char data. ''' + if not isinstance(pad, _DeprecatedParam): + print ("IntelHex.tobinarray: 'pad' parameter is deprecated.") + if pad is not None: + print ("Please, use IntelHex.padding attribute instead.") + else: + print ("Please, don't pass it explicitly.") + print ("Use syntax like this: ih.tobinarray(start=xxx, end=yyy, size=zzz)") + else: + pad = None + return self._tobinarray_really(start, end, pad, size) + + def _tobinarray_really(self, start, end, pad, size): + """Return binary array.""" if pad is None: pad = self.padding - bin = array('B') - if self._buf == {} and None in (start, end): return bin - if size is not None and size <= 0: raise ValueError("tobinarray: wrong value for size") - start, end = self._get_start_end(start, end, size) - - for i in xrange(start, end+1): + for i in range_g(start, end+1): bin.append(self._buf.get(i, pad)) - return bin - def tobinstr(self, start=None, end=None, pad=0xFF, size=None): - ''' Convert to binary form and return as a string. + def tobinstr(self, start=None, end=None, pad=_DEPRECATED, size=None): + ''' Convert to binary form and return as binary string. @param start start address of output bytes. @param end end address of output bytes (inclusive). - @param pad fill empty spaces with this value - (if None used self.padding). + @param pad [DEPRECATED PARAMETER, please use self.padding instead] + fill empty spaces with this value + (if pad is None then this method uses self.padding). @param size size of the block, used with start or end parameter. - @return string of binary data. + @return bytes string of binary data. ''' - return asstr(self.tobinarray(start, end, pad, size).tostring()) + if not isinstance(pad, _DeprecatedParam): + print ("IntelHex.tobinstr: 'pad' parameter is deprecated.") + if pad is not None: + print ("Please, use IntelHex.padding attribute instead.") + else: + print ("Please, don't pass it explicitly.") + print ("Use syntax like this: ih.tobinstr(start=xxx, end=yyy, size=zzz)") + else: + pad = None + return self._tobinstr_really(start, end, pad, size) - def tobinfile(self, fobj, start=None, end=None, pad=0xFF, size=None): + def _tobinstr_really(self, start, end, pad, size): + return array_tobytes(self._tobinarray_really(start, end, pad, size)) + + def tobinfile(self, fobj, start=None, end=None, pad=_DEPRECATED, size=None): '''Convert to binary and write to file. @param fobj file name or file object for writing output bytes. @param start start address of output bytes. @param end end address of output bytes (inclusive). - @param pad fill empty spaces with this value - (if None used self.padding). + @param pad [DEPRECATED PARAMETER, please use self.padding instead] + fill empty spaces with this value + (if pad is None then this method uses self.padding). @param size size of the block, used with start or end parameter. ''' + if not isinstance(pad, _DeprecatedParam): + print ("IntelHex.tobinfile: 'pad' parameter is deprecated.") + if pad is not None: + print ("Please, use IntelHex.padding attribute instead.") + else: + print ("Please, don't pass it explicitly.") + print ("Use syntax like this: ih.tobinfile(start=xxx, end=yyy, size=zzz)") + else: + pad = None if getattr(fobj, "write", None) is None: - fobj = file(fobj, "wb") + fobj = open(fobj, "wb") close_fd = True else: close_fd = False - fobj.write(self.tobinstr(start, end, pad, size)) + fobj.write(self._tobinstr_really(start, end, pad, size)) if close_fd: fobj.close() @@ -372,7 +421,7 @@ def addresses(self): '''Returns all used addresses in sorted order. @return list of occupied data addresses in sorted order. ''' - aa = self._buf.keys() + aa = dict_keys(self._buf) aa.sort() return aa @@ -380,7 +429,7 @@ def minaddr(self): '''Get minimal address of HEX content. @return minimal address or None if no data ''' - aa = self._buf.keys() + aa = dict_keys(self._buf) if aa == []: return None else: @@ -390,7 +439,7 @@ def maxaddr(self): '''Get maximal address of HEX content. @return maximal address or None if no data ''' - aa = self._buf.keys() + aa = dict_keys(self._buf) if aa == []: return None else: @@ -403,19 +452,19 @@ def __getitem__(self, addr): if no data found. ''' t = type(addr) - if t in (int, long): + if t in IntTypes: if addr < 0: raise TypeError('Address should be >= 0.') return self._buf.get(addr, self.padding) elif t == slice: - addresses = self._buf.keys() + addresses = dict_keys(self._buf) ih = IntelHex() if addresses: addresses.sort() start = addr.start or addresses[0] stop = addr.stop or (addresses[-1]+1) step = addr.step or 1 - for i in xrange(start, stop, step): + for i in range_g(start, stop, step): x = self._buf.get(i) if x is not None: ih[i] = x @@ -426,19 +475,18 @@ def __getitem__(self, addr): def __setitem__(self, addr, byte): """Set byte at address.""" t = type(addr) - if t in (int, long): + if t in IntTypes: if addr < 0: raise TypeError('Address should be >= 0.') self._buf[addr] = byte elif t == slice: - addresses = self._buf.keys() if not isinstance(byte, (list, tuple)): raise ValueError('Slice operation expects sequence of bytes') start = addr.start stop = addr.stop step = addr.step or 1 if None not in (start, stop): - ra = range(start, stop, step) + ra = range_l(start, stop, step) if len(ra) != len(byte): raise ValueError('Length of bytes sequence does not match ' 'address range') @@ -453,7 +501,7 @@ def __setitem__(self, addr, byte): if stop < 0: raise TypeError('stop address cannot be negative') j = 0 - for i in xrange(start, stop, step): + for i in range_g(start, stop, step): self._buf[i] = byte[j] j += 1 else: @@ -462,18 +510,18 @@ def __setitem__(self, addr, byte): def __delitem__(self, addr): """Delete byte at address.""" t = type(addr) - if t in (int, long): + if t in IntTypes: if addr < 0: raise TypeError('Address should be >= 0.') del self._buf[addr] elif t == slice: - addresses = self._buf.keys() + addresses = dict_keys(self._buf) if addresses: addresses.sort() start = addr.start or addresses[0] stop = addr.stop or (addresses[-1]+1) step = addr.step or 1 - for i in xrange(start, stop, step): + for i in range_g(start, stop, step): x = self._buf.get(i) if x is not None: del self._buf[i] @@ -482,9 +530,21 @@ def __delitem__(self, addr): def __len__(self): """Return count of bytes with real values.""" - return len(self._buf.keys()) + return len(dict_keys(self._buf)) + + def _get_eol_textfile(eolstyle, platform): + if eolstyle == 'native': + return '\n' + elif eolstyle == 'CRLF': + if platform != 'win32': + return '\r\n' + else: + return '\n' + else: + raise ValueError("wrong eolstyle %s" % repr(eolstyle)) + _get_eol_textfile = staticmethod(_get_eol_textfile) - def write_hex_file(self, f, write_start_addr=True): + def write_hex_file(self, f, write_start_addr=True, eolstyle='native', byte_count=16): """Write data to file f in HEX format. @param f filename or file-like object for writing @@ -492,30 +552,38 @@ def write_hex_file(self, f, write_start_addr=True): record to file (enabled by default). If there is no start address in obj, nothing will be written regardless of this setting. + @param eolstyle can be used to force CRLF line-endings + for output file on different platforms. + Supported eol styles: 'native', 'CRLF'. + @param byte_count number of bytes in the data field """ + if byte_count > 255 or byte_count < 1: + raise ValueError("wrong byte_count value: %s" % byte_count) fwrite = getattr(f, "write", None) if fwrite: fobj = f fclose = None else: - fobj = file(f, 'w') + fobj = open(f, 'w') fwrite = fobj.write fclose = fobj.close + eol = IntelHex._get_eol_textfile(eolstyle, sys.platform) + # Translation table for uppercasing hex ascii string. # timeit shows that using hexstr.translate(table) # is faster than hexstr.upper(): # 0.452ms vs. 0.652ms (translate vs. upper) if sys.version_info[0] >= 3: - table = bytes(range(256)).upper() + # Python 3 + table = bytes(range_l(256)).upper() else: - table = ''.join(chr(i).upper() for i in range(256)) - - + # Python 2 + table = ''.join(chr(i).upper() for i in range_g(256)) # start address record if any if self.start_addr and write_start_addr: - keys = self.start_addr.keys() + keys = dict_keys(self.start_addr) keys.sort() bin = array('B', asbytes('\0'*9)) if keys == ['CS','IP']: @@ -532,8 +600,8 @@ def write_hex_file(self, f, write_start_addr=True): bin[7] = ip & 0x0FF bin[8] = (-sum(bin)) & 0x0FF # chksum fwrite(':' + - asstr(hexlify(bin.tostring()).translate(table)) + - '\n') + asstr(hexlify(array_tobytes(bin)).translate(table)) + + eol) elif keys == ['EIP']: # Start Linear Address Record bin[0] = 4 # reclen @@ -547,15 +615,15 @@ def write_hex_file(self, f, write_start_addr=True): bin[7] = eip & 0x0FF bin[8] = (-sum(bin)) & 0x0FF # chksum fwrite(':' + - asstr(hexlify(bin.tostring()).translate(table)) + - '\n') + asstr(hexlify(array_tobytes(bin)).translate(table)) + + eol) else: if fclose: fclose() raise InvalidStartAddressValueError(start_addr=self.start_addr) # data - addresses = self._buf.keys() + addresses = dict_keys(self._buf) addresses.sort() addr_len = len(addresses) if addr_len: @@ -584,14 +652,14 @@ def write_hex_file(self, f, write_start_addr=True): bin[5] = b[1] # lsb of high_ofs bin[6] = (-sum(bin)) & 0x0FF # chksum fwrite(':' + - asstr(hexlify(bin.tostring()).translate(table)) + - '\n') + asstr(hexlify(array_tobytes(bin)).translate(table)) + + eol) while True: # produce one record low_addr = cur_addr & 0x0FFFF # chain_len off by 1 - chain_len = min(15, 65535-low_addr, maxaddr-cur_addr) + chain_len = min(byte_count-1, 65535-low_addr, maxaddr-cur_addr) # search continuous chain stop_addr = cur_addr + chain_len @@ -613,7 +681,7 @@ def write_hex_file(self, f, write_start_addr=True): bin[2] = b[1] # lsb of low_addr bin[3] = 0 # rectype try: # if there is small holes we'll catch them - for i in range(chain_len): + for i in range_g(chain_len): bin[4+i] = self._buf[cur_addr+i] except KeyError: # we catch a hole so we should shrink the chain @@ -622,8 +690,8 @@ def write_hex_file(self, f, write_start_addr=True): bin[0] = chain_len bin[4+chain_len] = (-sum(bin)) & 0x0FF # chksum fwrite(':' + - asstr(hexlify(bin.tostring()).translate(table)) + - '\n') + asstr(hexlify(array_tobytes(bin)).translate(table)) + + eol) # adjust cur_addr/cur_ix cur_ix += chain_len @@ -637,18 +705,19 @@ def write_hex_file(self, f, write_start_addr=True): break # end-of-file record - fwrite(":00000001FF\n") + fwrite(":00000001FF"+eol) if fclose: fclose() - def tofile(self, fobj, format): + def tofile(self, fobj, format, byte_count=16): """Write data to hex or bin file. Preferred method over tobin or tohex. @param fobj file name or file-like object @param format file format ("hex" or "bin") + @param byte_count bytes per line """ if format == 'hex': - self.write_hex_file(fobj) + self.write_hex_file(fobj, byte_count=byte_count) elif format == 'bin': self.tobinfile(fobj) else: @@ -658,25 +727,26 @@ def tofile(self, fobj, format): def gets(self, addr, length): """Get string of bytes from given address. If any entries are blank from addr through addr+length, a NotEnoughDataError exception will - be raised. Padding is not used.""" + be raised. Padding is not used. + """ a = array('B', asbytes('\0'*length)) try: - for i in xrange(length): + for i in range_g(length): a[i] = self._buf[addr+i] except KeyError: raise NotEnoughDataError(address=addr, length=length) - return asstr(a.tostring()) + return array_tobytes(a) def puts(self, addr, s): """Put string of bytes at given address. Will overwrite any previous entries. """ a = array('B', asbytes(s)) - for i in xrange(len(a)): + for i in range_g(len(a)): self._buf[addr+i] = a[i] def getsz(self, addr): - """Get zero-terminated string from given address. Will raise + """Get zero-terminated bytes string from given address. Will raise NotEnoughDataError exception if a hole is encountered before a 0. """ i = 0 @@ -691,20 +761,44 @@ def getsz(self, addr): return self.gets(addr, i) def putsz(self, addr, s): - """Put string in object at addr and append terminating zero at end.""" + """Put bytes string in object at addr and append terminating zero at end.""" self.puts(addr, s) self._buf[addr+len(s)] = 0 - def dump(self, tofile=None): + def find(self, sub, start=None, end=None): + """Return the lowest index in self[start:end] where subsection sub is found. + Optional arguments start and end are interpreted as in slice notation. + + @param sub bytes-like subsection to find + @param start start of section to search within (optional) + @param end end of section to search within (optional) + """ + sub = bytes(sub) + for start, end in self[slice(start,end)].segments(): + b = self.gets(start, end-start) + i = b.find(sub) + if i != -1: + return start+i + return -1 + + def dump(self, tofile=None, width=16, withpadding=False): """Dump object content to specified file object or to stdout if None. Format is a hexdump with some header information at the beginning, addresses on the left, and data on right. - @param tofile file-like object to dump to + @param tofile file-like object to dump to + @param width number of bytes per line (i.e. columns) + @param withpadding print padding character instead of '--' + @raise ValueError if width is not a positive integer """ + if not isinstance(width,int) or width < 1: + raise ValueError('width must be a positive integer.') + # The integer can be of float type - does not work with bit operations + width = int(width) if tofile is None: tofile = sys.stdout + # start addr possibly if self.start_addr is not None: cs = self.start_addr.get('CS') @@ -717,22 +811,26 @@ def dump(self, tofile=None): else: tofile.write('start_addr = %r\n' % start_addr) # actual data - addresses = self._buf.keys() + addresses = dict_keys(self._buf) if addresses: addresses.sort() minaddr = addresses[0] maxaddr = addresses[-1] - startaddr = int(minaddr>>4)*16 - endaddr = int((maxaddr>>4)+1)*16 - maxdigits = max(len(str(endaddr)), 4) + startaddr = (minaddr // width) * width + endaddr = ((maxaddr // width) + 1) * width + maxdigits = max(len(hex(endaddr)) - 2, 4) # Less 2 to exclude '0x' templa = '%%0%dX' % maxdigits - range16 = range(16) - for i in xrange(startaddr, endaddr, 16): + rangewidth = range_l(width) + if withpadding: + pad = self.padding + else: + pad = None + for i in range_g(startaddr, endaddr, width): tofile.write(templa % i) tofile.write(' ') s = [] - for j in range16: - x = self._buf.get(i+j) + for j in rangewidth: + x = self._buf.get(i+j, pad) if x is not None: tofile.write(' %02X' % x) if 32 <= x < 127: # GNU less does not like 0x7F (128 decimal) so we'd better show it as dot @@ -744,31 +842,32 @@ def dump(self, tofile=None): s.append(' ') tofile.write(' |' + ''.join(s) + '|\n') - def merge(this, other, overlap='error'): - """Merge content of other IntelHex object to this object. + def merge(self, other, overlap='error'): + """Merge content of other IntelHex object into current object (self). @param other other IntelHex object. @param overlap action on overlap of data or starting addr: - error: raising OverlapError; - - ignore: ignore other data and keep this data + - ignore: ignore other data and keep current data in overlapping region; - - replace: replace this data with other data + - replace: replace data with other data in overlapping region. @raise TypeError if other is not instance of IntelHex - @raise ValueError if other is the same object as this + @raise ValueError if other is the same object as self + (it can't merge itself) @raise ValueError if overlap argument has incorrect value @raise AddressOverlapError on overlapped data """ # check args if not isinstance(other, IntelHex): raise TypeError('other should be IntelHex object') - if other is this: + if other is self: raise ValueError("Can't merge itself") if overlap not in ('error', 'ignore', 'replace'): raise ValueError("overlap argument should be either " "'error', 'ignore' or 'replace'") # merge data - this_buf = this._buf + this_buf = self._buf other_buf = other._buf for i in other_buf: if i in this_buf: @@ -779,24 +878,53 @@ def merge(this, other, overlap='error'): continue this_buf[i] = other_buf[i] # merge start_addr - if this.start_addr != other.start_addr: - if this.start_addr is None: # set start addr from other - this.start_addr = other.start_addr - elif other.start_addr is None: # keep this start addr + if self.start_addr != other.start_addr: + if self.start_addr is None: # set start addr from other + self.start_addr = other.start_addr + elif other.start_addr is None: # keep existing start addr pass else: # conflict if overlap == 'error': raise AddressOverlapError( 'Starting addresses are different') elif overlap == 'replace': - this.start_addr = other.start_addr + self.start_addr = other.start_addr + + def segments(self, min_gap=1): + """Return a list of ordered tuple objects, representing contiguous occupied data addresses. + Each tuple has a length of two and follows the semantics of the range and xrange objects. + The second entry of the tuple is always an integer greater than the first entry. + @param min_gap the minimum gap size between data in order to separate the segments + """ + addresses = self.addresses() + if not addresses: + return [] + elif len(addresses) == 1: + return([(addresses[0], addresses[0]+1)]) + adjacent_differences = [(b - a) for (a, b) in zip(addresses[:-1], addresses[1:])] + breaks = [i for (i, x) in enumerate(adjacent_differences) if x > min_gap] + endings = [addresses[b] for b in breaks] + endings.append(addresses[-1]) + beginnings = [addresses[b+1] for b in breaks] + beginnings.insert(0, addresses[0]) + return [(a, b+1) for (a, b) in zip(beginnings, endings)] + + def get_memory_size(self): + """Returns the approximate memory footprint for data.""" + n = sys.getsizeof(self) + n += sys.getsizeof(self.padding) + n += total_size(self.start_addr) + n += total_size(self._buf) + n += sys.getsizeof(self._offset) + return n + #/IntelHex class IntelHex16bit(IntelHex): - """Access to data as 16-bit words.""" + """Access to data as 16-bit words. Intended to use with Microchip HEX files.""" - def __init__(self, source): + def __init__(self, source=None): """Construct class from HEX file or from instance of ordinary IntelHex class. If IntelHex object is passed as source, the original IntelHex object should not be used @@ -815,6 +943,9 @@ def __init__(self, source): # private members self._buf = source._buf self._offset = source._offset + elif isinstance(source, dict): + raise IntelHexError("IntelHex16bit does not support initialization from dictionary yet.\n" + "Patches are welcome.") else: IntelHex.__init__(self, source) @@ -856,7 +987,7 @@ def minaddr(self): @return minimal address used in this object ''' - aa = self._buf.keys() + aa = dict_keys(self._buf) if aa == []: return 0 else: @@ -867,16 +998,41 @@ def maxaddr(self): @return maximal address used in this object ''' - aa = self._buf.keys() + aa = dict_keys(self._buf) if aa == []: return 0 else: return max(aa)>>1 + def tobinarray(self, start=None, end=None, size=None): + '''Convert this object to binary form as array (of 2-bytes word data). + If start and end unspecified, they will be inferred from the data. + @param start start address of output data. + @param end end address of output data (inclusive). + @param size size of the block (number of words), + used with start or end parameter. + @return array of unsigned short (uint16_t) data. + ''' + bin = array('H') + + if self._buf == {} and None in (start, end): + return bin + + if size is not None and size <= 0: + raise ValueError("tobinarray: wrong value for size") + + start, end = self._get_start_end(start, end, size) + + for addr in range_g(start, end+1): + bin.append(self[addr]) + + return bin + + #/class IntelHex16bit -def hex2bin(fin, fout, start=None, end=None, size=None, pad=0xFF): +def hex2bin(fin, fout, start=None, end=None, size=None, pad=None): """Hex-to-Bin convertor engine. @return 0 if all OK @@ -889,7 +1045,8 @@ def hex2bin(fin, fout, start=None, end=None, size=None, pad=0xFF): """ try: h = IntelHex(fin) - except HexReaderError, e: + except HexReaderError: + e = sys.exc_info()[1] # current exception txt = "ERROR: bad HEX file: %s" % str(e) print(txt) return 1 @@ -907,8 +1064,12 @@ def hex2bin(fin, fout, start=None, end=None, size=None, pad=0xFF): start = 0 try: - h.tobinfile(fout, start, end, pad) - except IOError, e: + if pad is not None: + # using .padding attribute rather than pad argument to function call + h.padding = pad + h.tobinfile(fout, start, end) + except IOError: + e = sys.exc_info()[1] # current exception txt = "ERROR: Could not write to file: %s: %s" % (fout, str(e)) print(txt) return 1 @@ -928,14 +1089,16 @@ def bin2hex(fin, fout, offset=0): h = IntelHex() try: h.loadbin(fin, offset) - except IOError, e: + except IOError: + e = sys.exc_info()[1] # current exception txt = 'ERROR: unable to load bin file:', str(e) print(txt) return 1 try: h.tofile(fout, format='hex') - except IOError, e: + except IOError: + e = sys.exc_info()[1] # current exception txt = "ERROR: Could not write to file: %s: %s" % (fout, str(e)) print(txt) return 1 @@ -956,7 +1119,6 @@ def diff_dumps(ih1, ih2, tofile=None, name1="a", name2="b", n_context=3): @param n_context number of context lines in the unidiff output """ def prepare_lines(ih): - from cStringIO import StringIO sio = StringIO() ih.dump(sio) dump = sio.getvalue() @@ -987,7 +1149,7 @@ def _from_bytes(bytes): # calculate checksum s = (-sum(bytes)) & 0x0FF bin = array('B', bytes + [s]) - return ':' + asstr(hexlify(bin.tostring())).upper() + return ':' + asstr(hexlify(array_tobytes(bin))).upper() _from_bytes = staticmethod(_from_bytes) def data(offset, bytes): @@ -1071,7 +1233,7 @@ def _get_file_and_addr_range(s, _support_drive_letter=None): _support_drive_letter = (os.name == 'nt') drive = '' if _support_drive_letter: - if s[1:2] == ':' and s[0].upper() in ''.join([chr(i) for i in range(ord('A'), ord('Z')+1)]): + if s[1:2] == ':' and s[0].upper() in ''.join([chr(i) for i in range_g(ord('A'), ord('Z')+1)]): drive = s[:2] s = s[2:] parts = s.split(':') @@ -1118,6 +1280,7 @@ def ascii_hex_to_int(ascii): # _EndOfFile - it's not real error, used internally by hex reader as signal that EOF record found # BadAccess16bit - not enough data to read 16 bit value (deprecated, see NotEnoughDataError) # NotEnoughDataError - not enough data to read N contiguous bytes +# EmptyIntelHexError - requested operation cannot be performed with empty object class IntelHexError(Exception): '''Base Exception class for IntelHex module''' @@ -1128,7 +1291,7 @@ def __init__(self, msg=None, **kw): """Initialize the Exception with the given message. """ self.msg = msg - for key, value in kw.items(): + for key, value in dict_items_g(kw): setattr(self, key, value) def __str__(self): @@ -1137,7 +1300,8 @@ def __str__(self): return self.msg try: return self._fmt % self.__dict__ - except (NameError, ValueError, KeyError), e: + except (NameError, ValueError, KeyError): + e = sys.exc_info()[1] # current exception return 'Unprintable exception %s: %s' \ % (repr(e), str(e)) @@ -1203,3 +1367,6 @@ class NotEnoughDataError(IntelHexError): class BadAccess16bit(NotEnoughDataError): _fmt = 'Bad access at 0x%(address)X: not enough data to read 16 bit value' + +class EmptyIntelHexError(IntelHexError): + _fmt = "Requested operation cannot be executed with empty object" diff --git a/nesc/whip6/platforms/tools/programmer/ccbsl/intelhex/__main__.py b/nesc/whip6/platforms/tools/programmer/ccbsl/intelhex/__main__.py new file mode 100644 index 0000000..bb9d559 --- /dev/null +++ b/nesc/whip6/platforms/tools/programmer/ccbsl/intelhex/__main__.py @@ -0,0 +1,39 @@ +# Copyright (c) 2016-2018, Alexander Belchenko +# All rights reserved. +# +# Redistribution and use in source and binary forms, +# with or without modification, are permitted provided +# that the following conditions are met: +# +# * Redistributions of source code must retain +# the above copyright notice, this list of conditions +# and the following disclaimer. +# * Redistributions in binary form must reproduce +# the above copyright notice, this list of conditions +# and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of the author nor the names +# of its contributors may be used to endorse +# or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, +# BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY +# AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +# IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, +# OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, +# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, +# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +if __name__ == '__main__': + print("Welcome to IntelHex Python library.") + print() + print("The intelhex package has some executable points:") + print(" python -m intelhex.test -- easy way to run unit tests.") + print(" python -m intelhex.bench -- run benchmarks.") diff --git a/nesc/whip6/platforms/tools/programmer/ccbsl/intelhex/__version__.py b/nesc/whip6/platforms/tools/programmer/ccbsl/intelhex/__version__.py new file mode 100644 index 0000000..5ce4f0b --- /dev/null +++ b/nesc/whip6/platforms/tools/programmer/ccbsl/intelhex/__version__.py @@ -0,0 +1,3 @@ +# IntelHex library version information +version_info = (2, 2, 1) +version_str = '.'.join([str(i) for i in version_info]) diff --git a/nesc/whip6/platforms/tools/programmer/ccbsl/intelhex/bench.py b/nesc/whip6/platforms/tools/programmer/ccbsl/intelhex/bench.py index 0d8ed3e..df48447 100644 --- a/nesc/whip6/platforms/tools/programmer/ccbsl/intelhex/bench.py +++ b/nesc/whip6/platforms/tools/programmer/ccbsl/intelhex/bench.py @@ -1,6 +1,10 @@ #!/usr/bin/python # (c) Alexander Belchenko, 2007, 2009 +# [2013/08] NOTE: This file is keeping for historical reasons. +# It may or may not work actually with current version of intelhex, +# and most likely it requires some fixes here and there. + """Benchmarking. Run each test 3 times and get median value. @@ -23,13 +27,12 @@ i.e. time increase proportionally to array size. """ -from cStringIO import StringIO import gc import sys import time import intelhex - +from intelhex.compat import StringIO, range_g def median(values): """Return median value for the list of values. @@ -64,7 +67,7 @@ def run_readtest_N_times(func, hexstr, n): """ assert n > 0 times = [] - for i in xrange(n): + for i in range_g(n): sio = StringIO(hexstr) times.append(run_test(func, sio)) sio.close() @@ -79,7 +82,7 @@ def run_writetest_N_times(func, n): """ assert n > 0 times = [] - for i in xrange(n): + for i in range_g(n): sio = StringIO() times.append(run_test(func, sio)) sio.close() @@ -111,11 +114,11 @@ def get_test_data(n1, offset, n2): # make IntelHex object ih = intelhex.IntelHex() addr = 0 - for i in xrange(n1): + for i in range_g(n1): ih[addr] = addr % 256 addr += 1 addr += offset - for i in xrange(n2): + for i in range_g(n2): ih[addr] = addr % 256 addr += 1 # make hex file @@ -126,12 +129,11 @@ def get_test_data(n1, offset, n2): # return n1+n2, hexstr, ih -def get_base_10K(): - """Base 10K""" - return get_test_data(10000, 0, 0) +def get_base_50K(): + return get_test_data(50000, 0, 0) -def get_100K(): - return get_test_data(100000, 0, 0) +def get_250K(): + return get_test_data(250000, 0, 0) def get_100K_100K(): return get_test_data(100000, 1000000, 100000) @@ -148,8 +150,8 @@ class Measure(object): data_set = [ # (data name, getter) - ('base 10K', get_base_10K), # first should be base numbers - ('100K', get_100K), + ('base 50K', get_base_50K), # first should be base numbers + ('250K', get_250K), ('1M', get_1M), ('100K+100K', get_100K_100K), ('0+100K', get_0_100K), @@ -258,7 +260,8 @@ def main(argv=None): if args: raise getopt.GetoptError('Arguments are not used.') - except getopt.GetoptError, msg: + except getopt.GetoptError: + msg = sys.exc_info()[1] # current exception txt = str(msg) print(txt) return 1 diff --git a/nesc/whip6/platforms/tools/programmer/ccbsl/intelhex/compat.py b/nesc/whip6/platforms/tools/programmer/ccbsl/intelhex/compat.py index 3f48350..194cd5d 100644 --- a/nesc/whip6/platforms/tools/programmer/ccbsl/intelhex/compat.py +++ b/nesc/whip6/platforms/tools/programmer/ccbsl/intelhex/compat.py @@ -1,4 +1,5 @@ # Copyright (c) 2011, Bernhard Leiner +# Copyright (c) 2013-2018 Alexander Belchenko # All rights reserved. # # Redistribution and use in source and binary forms, @@ -34,15 +35,19 @@ '''Compatibility functions for python 2 and 3. @author Bernhard Leiner (bleiner AT gmail com) -@version 1.0 +@author Alexander Belchenko (alexander belchenko AT gmail com) ''' __docformat__ = "javadoc" -import sys +import sys, array + if sys.version_info[0] >= 3: + # Python 3 + Python = 3 + def asbytes(s): if isinstance(s, bytes): return s @@ -51,7 +56,104 @@ def asstr(s): if isinstance(s, str): return s return s.decode('latin1') + + array_tobytes = getattr(array.array, "tobytes", array.array.tostring) + + IntTypes = (int,) + StrType = str + UnicodeType = str + + range_g = range # range generator + def range_l(*args): # range list + return list(range(*args)) + + def dict_keys(dikt): # dict keys list + return list(dikt.keys()) + def dict_keys_g(dikt): # dict keys generator + return dikt.keys() + def dict_items_g(dikt): # dict items generator + return dikt.items() + + from io import StringIO, BytesIO + + def get_binary_stdout(): + return sys.stdout.buffer + + def get_binary_stdin(): + return sys.stdin.buffer + else: + # Python 2 + Python = 2 + asbytes = str asstr = str + array_tobytes = array.array.tostring + + IntTypes = (int, long) + StrType = basestring + UnicodeType = unicode + + #range_g = xrange # range generator + def range_g(*args): + # we want to use xrange here but on python 2 it does not work with long ints + try: + return xrange(*args) + except OverflowError: + start = 0 + stop = 0 + step = 1 + n = len(args) + if n == 1: + stop = args[0] + elif n == 2: + start, stop = args + elif n == 3: + start, stop, step = args + else: + raise TypeError('wrong number of arguments in range_g call!') + if step == 0: + raise ValueError('step cannot be zero') + if step > 0: + def up(start, stop, step): + while start < stop: + yield start + start += step + return up(start, stop, step) + else: + def down(start, stop, step): + while start > stop: + yield start + start += step + return down(start, stop, step) + + range_l = range # range list + + def dict_keys(dikt): # dict keys list + return dikt.keys() + def dict_keys_g(dikt): # dict keys generator + return dikt.keys() + def dict_items_g(dikt): # dict items generator + return dikt.items() + + from cStringIO import StringIO + BytesIO = StringIO + + import os + def _force_stream_binary(stream): + """Force binary mode for stream on Windows.""" + if os.name == 'nt': + f_fileno = getattr(stream, 'fileno', None) + if f_fileno: + fileno = f_fileno() + if fileno >= 0: + import msvcrt + msvcrt.setmode(fileno, os.O_BINARY) + return stream + + def get_binary_stdout(): + return _force_stream_binary(sys.stdout) + + def get_binary_stdin(): + return _force_stream_binary(sys.stdin) diff --git a/nesc/whip6/platforms/tools/programmer/ccbsl/intelhex/getsizeof.py b/nesc/whip6/platforms/tools/programmer/ccbsl/intelhex/getsizeof.py new file mode 100644 index 0000000..105e14a --- /dev/null +++ b/nesc/whip6/platforms/tools/programmer/ccbsl/intelhex/getsizeof.py @@ -0,0 +1,64 @@ +# Recursive version sys.getsizeof(). Extendable with custom handlers. +# Code from http://code.activestate.com/recipes/577504/ +# Created by Raymond Hettinger on Fri, 17 Dec 2010 (MIT) + +import sys +from itertools import chain +from collections import deque +try: + from reprlib import repr +except ImportError: + pass + +def total_size(o, handlers={}, verbose=False): + """ Returns the approximate memory footprint an object and all of its contents. + + Automatically finds the contents of the following builtin containers and + their subclasses: tuple, list, deque, dict, set and frozenset. + To search other containers, add handlers to iterate over their contents: + + handlers = {SomeContainerClass: iter, + OtherContainerClass: OtherContainerClass.get_elements} + + """ + dict_handler = lambda d: chain.from_iterable(d.items()) + all_handlers = {tuple: iter, + list: iter, + deque: iter, + dict: dict_handler, + set: iter, + frozenset: iter, + } + all_handlers.update(handlers) # user handlers take precedence + seen = set() # track which object id's have already been seen + default_size = sys.getsizeof(0) # estimate sizeof object without __sizeof__ + + def sizeof(o): + if id(o) in seen: # do not double count the same object + return 0 + seen.add(id(o)) + s = sys.getsizeof(o, default_size) + + if verbose: + print(s, type(o), repr(o))#, file=stderr) + + for typ, handler in all_handlers.items(): + if isinstance(o, typ): + s += sum(map(sizeof, handler(o))) + break + return s + + return sizeof(o) + + +##### Example call ##### + +if __name__ == '__main__': + #d = dict(a=1, b=2, c=3, d=[4,5,6,7], e='a string of chars') + print("dict 3 elements") + d = {0:0xFF, 1:0xEE, 2:0xCC} + print(total_size(d, verbose=True)) + + #print("array 3 elements") + #import array + #print(total_size(array.array('B', b'\x01\x02\x03'))) diff --git a/nesc/whip6/platforms/tools/programmer/ccbsl/intelhex/test.py b/nesc/whip6/platforms/tools/programmer/ccbsl/intelhex/test.py old mode 100755 new mode 100644 index 6d45b3e..50276b0 --- a/nesc/whip6/platforms/tools/programmer/ccbsl/intelhex/test.py +++ b/nesc/whip6/platforms/tools/programmer/ccbsl/intelhex/test.py @@ -1,6 +1,4 @@ -#!/usr/bin/python - -# Copyright (c) 2005-2012, Alexander Belchenko +# Copyright (c) 2005-2018, Alexander Belchenko # All rights reserved. # # Redistribution and use in source and binary forms, @@ -33,37 +31,51 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, # EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -"""Test suite for IntelHex class.""" +"""Test suite for IntelHex library.""" import array -from cStringIO import StringIO import os +import shlex +import subprocess import sys import tempfile import unittest -from compat import asbytes, asstr import intelhex -from intelhex import IntelHex, \ - IntelHexError, \ - HexReaderError, \ - AddressOverlapError, \ - HexRecordError, \ - RecordLengthError, \ - RecordTypeError, \ - RecordChecksumError, \ - EOFRecordError, \ - ExtendedSegmentAddressRecordError, \ - ExtendedLinearAddressRecordError, \ - StartSegmentAddressRecordError, \ - StartLinearAddressRecordError, \ - DuplicateStartAddressRecordError, \ - InvalidStartAddressValueError, \ - _EndOfFile, \ - BadAccess16bit, \ - hex2bin, \ - Record - +from intelhex import ( + IntelHex, + IntelHexError, + HexReaderError, + AddressOverlapError, + HexRecordError, + RecordLengthError, + RecordTypeError, + RecordChecksumError, + EOFRecordError, + ExtendedSegmentAddressRecordError, + ExtendedLinearAddressRecordError, + StartSegmentAddressRecordError, + StartLinearAddressRecordError, + DuplicateStartAddressRecordError, + InvalidStartAddressValueError, + _EndOfFile, + BadAccess16bit, + hex2bin, + Record, + ) +from intelhex import compat +from intelhex.compat import ( + BytesIO, + StringIO, + UnicodeType, + array_tobytes, + asbytes, + asstr, + dict_items_g, + range_g, + range_l, + ) +from intelhex.__version__ import version_str __docformat__ = 'restructuredtext' @@ -376,7 +388,8 @@ def assertRaisesMsg(self, excClass, msg, callableObj, *args, **kwargs): """ try: callableObj(*args, **kwargs) - except excClass, exc: + except excClass: + exc = sys.exc_info()[1] # current exception excMsg = str(exc) if not msg: # No message provided: any message is fine. @@ -401,7 +414,7 @@ def assertRaisesMsg(self, excClass, msg, callableObj, *args, **kwargs): ) def assertEqualWrittenData(self, a, b): - return self.assertEquals(a, b, """Written data is incorrect + return self.assertEqual(a, b, """Written data is incorrect Should be: %s @@ -422,7 +435,7 @@ def tearDown(self): def test_init_from_file(self): ih = IntelHex(self.f) - for addr in xrange(len(bin8)): + for addr in range_g(len(bin8)): expected = bin8[addr] actual = ih[addr] self.assertEqual(expected, actual, @@ -432,7 +445,7 @@ def test_init_from_file(self): def test_hex_fromfile(self): ih = IntelHex() ih.fromfile(self.f, format='hex') - for addr in xrange(len(bin8)): + for addr in range_g(len(bin8)): expected = bin8[addr] actual = ih[addr] self.assertEqual(expected, actual, @@ -440,10 +453,10 @@ def test_hex_fromfile(self): "%x (%x != %x)" % (addr, expected, actual)) def test_unicode_filename(self): - handle, fname = tempfile.mkstemp(u'') + handle, fname = tempfile.mkstemp(UnicodeType('')) os.close(handle) try: - self.assertTrue(isinstance(fname, unicode)) + self.assertTrue(isinstance(fname, UnicodeType)) f = open(fname, 'w') try: f.write(hex8) @@ -457,10 +470,11 @@ def test_unicode_filename(self): def test_tobinarray_empty(self): ih = IntelHex() - self.assertEqual(array.array('B', []), ih.tobinarray(pad=0xFF)) - self.assertEqual(array.array('B', []), ih.tobinarray(start=0,pad=0xFF)) - self.assertEqual(array.array('B', []), ih.tobinarray(end=2,pad=0xFF)) - self.assertEqual(array.array('B', [255,255,255]), ih.tobinarray(0,2,pad=0xFF)) + ih.padding = 0xFF # set-up explicit padding value and don't use pad parameter + self.assertEqual(array.array('B', []), ih.tobinarray()) + self.assertEqual(array.array('B', []), ih.tobinarray(start=0)) + self.assertEqual(array.array('B', []), ih.tobinarray(end=2)) + self.assertEqual(array.array('B', [255,255,255]), ih.tobinarray(0,2)) def test_tobinarray_with_size(self): ih = IntelHex(self.f) @@ -480,25 +494,41 @@ def test_tobinarray_with_size(self): def test_tobinstr(self): ih = IntelHex(self.f) s1 = ih.tobinstr() - s2 = asstr(bin8.tostring()) + s2 = array_tobytes(bin8) self.assertEqual(s2, s1, "data not equal\n%s\n\n%s" % (s1, s2)) def test_tobinfile(self): ih = IntelHex(self.f) - sio = StringIO() + sio = BytesIO() ih.tobinfile(sio) s1 = sio.getvalue() sio.close() - s2 = asstr(bin8.tostring()) + s2 = array_tobytes(bin8) self.assertEqual(s2, s1, "data not equal\n%s\n\n%s" % (s1, s2)) # new API: .tofile universal method - sio = StringIO() + sio = BytesIO() ih.tofile(sio, format='bin') s1 = sio.getvalue() sio.close() - s2 = asstr(bin8.tostring()) + s2 = array_tobytes(bin8) self.assertEqual(s2, s1, "data not equal\n%s\n\n%s" % (s1, s2)) + def test_tobinfile_realfile(self): + ih = IntelHex(self.f) + tf = tempfile.TemporaryFile(mode='wb') + try: + ih.tobinfile(tf) + finally: + tf.close() + + def test__get_eol_textfile(self): + self.assertEqual('\n', IntelHex._get_eol_textfile('native', 'win32')) + self.assertEqual('\n', IntelHex._get_eol_textfile('native', 'linux')) + self.assertEqual('\n', IntelHex._get_eol_textfile('CRLF', 'win32')) + self.assertEqual('\r\n', IntelHex._get_eol_textfile('CRLF', 'linux')) + self.assertRaisesMsg(ValueError, "wrong eolstyle 'LF'", + IntelHex._get_eol_textfile, 'LF', 'win32') + def test_write_empty_hexfile(self): ih = intelhex.IntelHex() sio = StringIO() @@ -550,61 +580,61 @@ def test_tofile_wrong_format(self): def test_todict(self): ih = IntelHex() - self.assertEquals({}, ih.todict()) + self.assertEqual({}, ih.todict()) ih = IntelHex(StringIO(hex64k)) - self.assertEquals(data64k, ih.todict()) + self.assertEqual(data64k, ih.todict()) ih = IntelHex() ih[1] = 2 ih.start_addr = {'EIP': 1234} - self.assertEquals({1: 2, 'start_addr': {'EIP': 1234}}, ih.todict()) + self.assertEqual({1: 2, 'start_addr': {'EIP': 1234}}, ih.todict()) def test_fromdict(self): ih = IntelHex() ih.fromdict({1:2, 3:4}) - self.assertEquals({1:2, 3:4}, ih.todict()) + self.assertEqual({1:2, 3:4}, ih.todict()) ih.fromdict({1:5, 6:7}) - self.assertEquals({1:5, 3:4, 6:7}, ih.todict()) + self.assertEqual({1:5, 3:4, 6:7}, ih.todict()) ih = IntelHex() ih.fromdict({1: 2, 'start_addr': {'EIP': 1234}}) - self.assertEquals({1: 2, 'start_addr': {'EIP': 1234}}, ih.todict()) + self.assertEqual({1: 2, 'start_addr': {'EIP': 1234}}, ih.todict()) # bad dict self.assertRaises(ValueError, ih.fromdict, {'EIP': 1234}) self.assertRaises(ValueError, ih.fromdict, {-1: 1234}) def test_init_from_obj(self): ih = IntelHex({1:2, 3:4}) - self.assertEquals({1:2, 3:4}, ih.todict()) + self.assertEqual({1:2, 3:4}, ih.todict()) ih.start_addr = {'EIP': 1234} ih2 = IntelHex(ih) ih[1] = 5 ih.start_addr = {'EIP': 5678} - self.assertEquals({1:2, 3:4, 'start_addr': {'EIP': 1234}}, ih2.todict()) + self.assertEqual({1:2, 3:4, 'start_addr': {'EIP': 1234}}, ih2.todict()) self.assertNotEqual(id(ih), id(ih2)) def test_dict_interface(self): ih = IntelHex() - self.assertEquals(0xFF, ih[0]) # padding byte substitution + self.assertEqual(0xFF, ih[0]) # padding byte substitution ih[0] = 1 - self.assertEquals(1, ih[0]) + self.assertEqual(1, ih[0]) del ih[0] - self.assertEquals({}, ih.todict()) # padding byte substitution + self.assertEqual({}, ih.todict()) # padding byte substitution def test_len(self): ih = IntelHex() - self.assertEquals(0, len(ih)) + self.assertEqual(0, len(ih)) ih[2] = 1 - self.assertEquals(1, len(ih)) + self.assertEqual(1, len(ih)) ih[1000] = 2 - self.assertEquals(2, len(ih)) + self.assertEqual(2, len(ih)) def test__getitem__(self): ih = IntelHex() # simple cases - self.assertEquals(0xFF, ih[0]) + self.assertEqual(0xFF, ih[0]) ih[0] = 1 - self.assertEquals(1, ih[0]) + self.assertEqual(1, ih[0]) # big address - self.assertEquals(0xFF, ih[2**32-1]) + self.assertEqual(0xFF, ih[2**32-1]) # wrong addr type/value for indexing operations def getitem(index): return ih[index] @@ -623,20 +653,20 @@ def getitem(index): # full copy via slicing ih2 = ih[:] self.assertTrue(isinstance(ih2, IntelHex)) - self.assertEquals({0:1, 1:2, 2:3, 10:4}, ih2.todict()) + self.assertEqual({0:1, 1:2, 2:3, 10:4}, ih2.todict()) # other slice operations - self.assertEquals({}, ih[3:8].todict()) - self.assertEquals({0:1, 1:2}, ih[0:2].todict()) - self.assertEquals({0:1, 1:2}, ih[:2].todict()) - self.assertEquals({2:3, 10:4}, ih[2:].todict()) - self.assertEquals({0:1, 2:3, 10:4}, ih[::2].todict()) - self.assertEquals({10:4}, ih[3:11].todict()) + self.assertEqual({}, ih[3:8].todict()) + self.assertEqual({0:1, 1:2}, ih[0:2].todict()) + self.assertEqual({0:1, 1:2}, ih[:2].todict()) + self.assertEqual({2:3, 10:4}, ih[2:].todict()) + self.assertEqual({0:1, 2:3, 10:4}, ih[::2].todict()) + self.assertEqual({10:4}, ih[3:11].todict()) def test__setitem__(self): ih = IntelHex() # simple indexing operation ih[0] = 1 - self.assertEquals({0:1}, ih.todict()) + self.assertEqual({0:1}, ih.todict()) # errors def setitem(a,b): ih[a] = b @@ -647,16 +677,16 @@ def setitem(a,b): "Address has unsupported type: %s" % type('foo'), setitem, 'foo', 0) # slice operations - ih[0:4] = range(4) - self.assertEquals({0:0, 1:1, 2:2, 3:3}, ih.todict()) - ih[0:] = range(5,9) - self.assertEquals({0:5, 1:6, 2:7, 3:8}, ih.todict()) - ih[:4] = range(9,13) - self.assertEquals({0:9, 1:10, 2:11, 3:12}, ih.todict()) + ih[0:4] = range_l(4) + self.assertEqual({0:0, 1:1, 2:2, 3:3}, ih.todict()) + ih[0:] = range_l(5,9) + self.assertEqual({0:5, 1:6, 2:7, 3:8}, ih.todict()) + ih[:4] = range_l(9,13) + self.assertEqual({0:9, 1:10, 2:11, 3:12}, ih.todict()) # with step ih = IntelHex() - ih[0:8:2] = range(4) - self.assertEquals({0:0, 2:1, 4:2, 6:3}, ih.todict()) + ih[0:8:2] = range_l(4) + self.assertEqual({0:0, 2:1, 4:2, 6:3}, ih.todict()) # errors in slice operations # ih[1:2] = 'a' self.assertRaisesMsg(ValueError, @@ -683,7 +713,7 @@ def test__delitem__(self): ih = IntelHex() ih[0] = 1 del ih[0] - self.assertEquals({}, ih.todict()) + self.assertEqual({}, ih.todict()) # errors def delitem(addr): del ih[addr] @@ -699,37 +729,100 @@ def delitem(addr): # def ihex(size=8): ih = IntelHex() - for i in xrange(size): + for i in range_g(size): ih[i] = i return ih ih = ihex(8) del ih[:] # delete all data - self.assertEquals({}, ih.todict()) + self.assertEqual({}, ih.todict()) ih = ihex(8) del ih[2:6] - self.assertEquals({0:0, 1:1, 6:6, 7:7}, ih.todict()) + self.assertEqual({0:0, 1:1, 6:6, 7:7}, ih.todict()) ih = ihex(8) del ih[::2] - self.assertEquals({1:1, 3:3, 5:5, 7:7}, ih.todict()) + self.assertEqual({1:1, 3:3, 5:5, 7:7}, ih.todict()) def test_addresses(self): # empty object ih = IntelHex() - self.assertEquals([], ih.addresses()) - self.assertEquals(None, ih.minaddr()) - self.assertEquals(None, ih.maxaddr()) + self.assertEqual([], ih.addresses()) + self.assertEqual(None, ih.minaddr()) + self.assertEqual(None, ih.maxaddr()) + # normal object + ih = IntelHex({1:2, 7:8, 10:0}) + self.assertEqual([1,7,10], ih.addresses()) + self.assertEqual(1, ih.minaddr()) + self.assertEqual(10, ih.maxaddr()) + + def test__get_start_end(self): + # test for private method _get_start_end + # for empty object + ih = IntelHex() + self.assertRaises(intelhex.EmptyIntelHexError, ih._get_start_end) + self.assertRaises(intelhex.EmptyIntelHexError, ih._get_start_end, size=10) + self.assertEqual((0,9), ih._get_start_end(start=0, size=10)) + self.assertEqual((1,10), ih._get_start_end(end=10, size=10)) # normal object ih = IntelHex({1:2, 7:8, 10:0}) - self.assertEquals([1,7,10], ih.addresses()) - self.assertEquals(1, ih.minaddr()) - self.assertEquals(10, ih.maxaddr()) + self.assertEqual((1,10), ih._get_start_end()) + self.assertEqual((1,10), ih._get_start_end(size=10)) + self.assertEqual((0,9), ih._get_start_end(start=0, size=10)) + self.assertEqual((1,10), ih._get_start_end(end=10, size=10)) + def test_segments(self): + # test that address segments are correctly summarized + ih = IntelHex() + sg = ih.segments() + self.assertTrue(isinstance(sg, list)) + self.assertEqual(len(sg), 0) + ih[0x100] = 0 + sg = ih.segments() + self.assertTrue(isinstance(sg, list)) + self.assertEqual(len(sg), 1) + self.assertTrue(isinstance(sg[0], tuple)) + self.assertTrue(len(sg[0]) == 2) + self.assertTrue(sg[0][0] < sg[0][1]) + self.assertEqual(min(sg[0]), 0x100) + self.assertEqual(max(sg[0]), 0x101) + ih[0x101] = 1 + sg = ih.segments() + self.assertTrue(isinstance(sg, list)) + self.assertEqual(len(sg), 1) + self.assertTrue(isinstance(sg[0], tuple)) + self.assertTrue(len(sg[0]) == 2) + self.assertTrue(sg[0][0] < sg[0][1]) + self.assertEqual(min(sg[0]), 0x100) + self.assertEqual(max(sg[0]), 0x102) + ih[0x200] = 2 + ih[0x201] = 3 + ih[0x202] = 4 + sg = ih.segments() + self.assertTrue(isinstance(sg, list)) + self.assertEqual(len(sg), 2) + self.assertTrue(isinstance(sg[0], tuple)) + self.assertTrue(len(sg[0]) == 2) + self.assertTrue(sg[0][0] < sg[0][1]) + self.assertTrue(isinstance(sg[1], tuple)) + self.assertTrue(len(sg[1]) == 2) + self.assertTrue(sg[1][0] < sg[1][1]) + self.assertEqual(min(sg[0]), 0x100) + self.assertEqual(max(sg[0]), 0x102) + self.assertEqual(min(sg[1]), 0x200) + self.assertEqual(max(sg[1]), 0x203) + ih[0x204] = 5 + sg = ih.segments() + self.assertEqual(len(sg), 3) + sg = ih.segments(min_gap=2) + self.assertEqual(len(sg), 2) + self.assertEqual(min(sg[1]), 0x200) + self.assertEqual(max(sg[1]), 0x205) + pass class TestIntelHexLoadBin(TestIntelHexBase): def setUp(self): - self.data = '0123456789' - self.f = StringIO(self.data) + self.bytes = asbytes('0123456789') + self.f = BytesIO(self.bytes) def tearDown(self): self.f.close() @@ -739,28 +832,28 @@ def test_loadbin(self): ih.loadbin(self.f) self.assertEqual(0, ih.minaddr()) self.assertEqual(9, ih.maxaddr()) - self.assertEqual(self.data, ih.tobinstr()) + self.assertEqual(self.bytes, ih.tobinstr()) def test_bin_fromfile(self): ih = IntelHex() ih.fromfile(self.f, format='bin') self.assertEqual(0, ih.minaddr()) self.assertEqual(9, ih.maxaddr()) - self.assertEqual(self.data, ih.tobinstr()) + self.assertEqual(self.bytes, ih.tobinstr()) def test_loadbin_w_offset(self): ih = IntelHex() ih.loadbin(self.f, offset=100) self.assertEqual(100, ih.minaddr()) self.assertEqual(109, ih.maxaddr()) - self.assertEqual(self.data, ih.tobinstr()) + self.assertEqual(self.bytes, ih.tobinstr()) def test_loadfile_format_bin(self): ih = IntelHex() ih.loadfile(self.f, format='bin') self.assertEqual(0, ih.minaddr()) self.assertEqual(9, ih.maxaddr()) - self.assertEqual(self.data, ih.tobinstr()) + self.assertEqual(self.bytes, ih.tobinstr()) class TestIntelHexStartingAddressRecords(TestIntelHexBase): @@ -831,9 +924,9 @@ def tearDown(self): def test_readfile(self): ih = intelhex.IntelHex(self.f) - for addr, byte in data64k.items(): + for addr, byte in dict_items_g(data64k): readed = ih[addr] - self.assertEquals(byte, readed, + self.assertEqual(byte, readed, "data not equal at addr %X " "(%X != %X)" % (addr, byte, readed)) @@ -850,33 +943,77 @@ class TestIntelHexGetPutString(TestIntelHexBase): def setUp(self): self.ih = IntelHex() - for i in xrange(10): + for i in range_g(10): self.ih[i] = i def test_gets(self): - self.assertEquals('\x00\x01\x02\x03\x04\x05\x06\x07', self.ih.gets(0, 8)) - self.assertEquals('\x07\x08\x09', self.ih.gets(7, 3)) + self.assertEqual(asbytes('\x00\x01\x02\x03\x04\x05\x06\x07'), self.ih.gets(0, 8)) + self.assertEqual(asbytes('\x07\x08\x09'), self.ih.gets(7, 3)) self.assertRaisesMsg(intelhex.NotEnoughDataError, 'Bad access at 0x1: ' 'not enough data to read 10 contiguous bytes', self.ih.gets, 1, 10) def test_puts(self): - self.ih.puts(0x03, 'hello') - self.assertEquals('\x00\x01\x02hello\x08\x09', self.ih.gets(0, 10)) + self.ih.puts(0x03, asbytes('hello')) + self.assertEqual(asbytes('\x00\x01\x02hello\x08\x09'), self.ih.gets(0, 10)) def test_getsz(self): - self.assertEquals('', self.ih.getsz(0)) + self.assertEqual(asbytes(''), self.ih.getsz(0)) self.assertRaisesMsg(intelhex.NotEnoughDataError, 'Bad access at 0x1: ' 'not enough data to read zero-terminated string', self.ih.getsz, 1) self.ih[4] = 0 - self.assertEquals('\x01\x02\x03', self.ih.getsz(1)) + self.assertEqual(asbytes('\x01\x02\x03'), self.ih.getsz(1)) def test_putsz(self): - self.ih.putsz(0x03, 'hello') - self.assertEquals('\x00\x01\x02hello\x00\x09', self.ih.gets(0, 10)) + self.ih.putsz(0x03, asbytes('hello')) + self.assertEqual(asbytes('\x00\x01\x02hello\x00\x09'), self.ih.gets(0, 10)) + + def test_find(self): + self.assertEqual(0, self.ih.find(asbytes('\x00\x01\x02\x03\x04\x05\x06'))) + self.assertEqual(0, self.ih.find(asbytes('\x00'))) + self.assertEqual(3, self.ih.find(asbytes('\x03\x04\x05\x06'))) + self.assertEqual(3, self.ih.find(asbytes('\x03'))) + self.assertEqual(7, self.ih.find(asbytes('\x07\x08\x09'))) + self.assertEqual(7, self.ih.find(asbytes('\x07'))) + self.assertEqual(-1, self.ih.find(asbytes('\x0a'))) + self.assertEqual(-1, self.ih.find(asbytes('\x02\x01'))) + self.assertEqual(-1, self.ih.find(asbytes('\x08\x07'))) + + def test_find_start(self): + self.assertEqual(-1, self.ih.find(asbytes('\x00\x01\x02\x03\x04\x05\x06'), start=3)) + self.assertEqual(-1, self.ih.find(asbytes('\x00'), start=3)) + self.assertEqual(3, self.ih.find(asbytes('\x03\x04\x05\x06'), start=3)) + self.assertEqual(3, self.ih.find(asbytes('\x03'), start=3)) + self.assertEqual(7, self.ih.find(asbytes('\x07\x08\x09'), start=3)) + self.assertEqual(7, self.ih.find(asbytes('\x07'), start=3)) + self.assertEqual(-1, self.ih.find(asbytes('\x0a'), start=3)) + self.assertEqual(-1, self.ih.find(asbytes('\x02\x01'), start=3)) + self.assertEqual(-1, self.ih.find(asbytes('\x08\x07'), start=3)) + + def test_find_end(self): + self.assertEqual(-1, self.ih.find(asbytes('\x00\x01\x02\x03\x04\x05\x06'), end=4)) + self.assertEqual(0, self.ih.find(asbytes('\x00'), end=4)) + self.assertEqual(-1, self.ih.find(asbytes('\x03\x04\x05\x06'), end=4)) + self.assertEqual(3, self.ih.find(asbytes('\x03'), end=4)) + self.assertEqual(-1, self.ih.find(asbytes('\x07\x08\x09'), end=4)) + self.assertEqual(-1, self.ih.find(asbytes('\x07'), end=4)) + self.assertEqual(-1, self.ih.find(asbytes('\x0a'), end=4)) + self.assertEqual(-1, self.ih.find(asbytes('\x02\x01'), end=4)) + self.assertEqual(-1, self.ih.find(asbytes('\x08\x07'), end=4)) + + def test_find_start_end(self): + self.assertEqual(-1, self.ih.find(asbytes('\x00\x01\x02\x03\x04\x05\x06'), start=3, end=7)) + self.assertEqual(-1, self.ih.find(asbytes('\x00'), start=3, end=7)) + self.assertEqual(3, self.ih.find(asbytes('\x03\x04\x05\x06'), start=3, end=7)) + self.assertEqual(3, self.ih.find(asbytes('\x03'), start=3, end=7)) + self.assertEqual(-1, self.ih.find(asbytes('\x07\x08\x09'), start=3, end=7)) + self.assertEqual(-1, self.ih.find(asbytes('\x07'), start=3, end=7)) + self.assertEqual(-1, self.ih.find(asbytes('\x0a'), start=3, end=7)) + self.assertEqual(-1, self.ih.find(asbytes('\x02\x01'), start=3, end=7)) + self.assertEqual(-1, self.ih.find(asbytes('\x08\x07'), start=3, end=7)) class TestIntelHexDump(TestIntelHexBase): @@ -885,7 +1022,7 @@ def test_empty(self): ih = IntelHex() sio = StringIO() ih.dump(sio) - self.assertEquals('', sio.getvalue()) + self.assertEqual('', sio.getvalue()) def test_simple(self): ih = IntelHex() @@ -893,14 +1030,14 @@ def test_simple(self): ih[1] = 0x34 sio = StringIO() ih.dump(sio) - self.assertEquals( + self.assertEqual( '0000 12 34 -- -- -- -- -- -- -- -- -- -- -- -- -- -- |.4 |\n', sio.getvalue()) ih[16] = 0x56 ih[30] = 0x98 sio = StringIO() ih.dump(sio) - self.assertEquals( + self.assertEqual( '0000 12 34 -- -- -- -- -- -- -- -- -- -- -- -- -- -- |.4 |\n' '0010 56 -- -- -- -- -- -- -- -- -- -- -- -- -- 98 -- |V . |\n', sio.getvalue()) @@ -911,7 +1048,7 @@ def test_minaddr_not_zero(self): ih[30] = 0x98 sio = StringIO() ih.dump(sio) - self.assertEquals( + self.assertEqual( '0010 56 -- -- -- -- -- -- -- -- -- -- -- -- -- 98 -- |V . |\n', sio.getvalue()) @@ -922,18 +1059,73 @@ def test_start_addr(self): ih.start_addr = {'CS': 0x1234, 'IP': 0x5678} sio = StringIO() ih.dump(sio) - self.assertEquals( + self.assertEqual( 'CS = 0x1234, IP = 0x5678\n' '0000 12 34 -- -- -- -- -- -- -- -- -- -- -- -- -- -- |.4 |\n', sio.getvalue()) ih.start_addr = {'EIP': 0x12345678} sio = StringIO() ih.dump(sio) - self.assertEquals( + self.assertEqual( 'EIP = 0x12345678\n' '0000 12 34 -- -- -- -- -- -- -- -- -- -- -- -- -- -- |.4 |\n', sio.getvalue()) + def test_bad_width(self): + ih = IntelHex() + sio = StringIO() + badwidths = [0, -1, -10.5, 2.5] + for bw in badwidths: + self.assertRaisesMsg(ValueError, "width must be a positive integer.", + ih.dump, sio, bw) + badwidthtypes = ['', {}, [], sio] + for bwt in badwidthtypes: + self.assertRaisesMsg(ValueError, "width must be a positive integer.", + ih.dump, sio, bwt) + + def test_simple_width3(self): + ih = IntelHex() + ih[0] = 0x12 + ih[1] = 0x34 + sio = StringIO() + ih.dump(tofile=sio, width=3) + self.assertEqual( + '0000 12 34 -- |.4 |\n', + sio.getvalue()) + + ih[16] = 0x56 + ih[30] = 0x98 + sio = StringIO() + ih.dump(tofile=sio, width=3) + self.assertEqual( + '0000 12 34 -- |.4 |\n' + '0003 -- -- -- | |\n' + '0006 -- -- -- | |\n' + '0009 -- -- -- | |\n' + '000C -- -- -- | |\n' + '000F -- 56 -- | V |\n' + '0012 -- -- -- | |\n' + '0015 -- -- -- | |\n' + '0018 -- -- -- | |\n' + '001B -- -- -- | |\n' + '001E 98 -- -- |. |\n', + sio.getvalue()) + + def test_minaddr_not_zero_width3_padding(self): + ih = IntelHex() + ih[17] = 0x56 + ih[30] = 0x98 + sio = StringIO() + ih.dump(tofile=sio, width=3, withpadding=True) + self.assertEqual( + '000F FF FF 56 |..V|\n' + '0012 FF FF FF |...|\n' + '0015 FF FF FF |...|\n' + '0018 FF FF FF |...|\n' + '001B FF FF FF |...|\n' + '001E 98 FF FF |...|\n', + sio.getvalue()) + class TestIntelHexMerge(TestIntelHexBase): @@ -941,13 +1133,13 @@ def test_merge_empty(self): ih1 = IntelHex() ih2 = IntelHex() ih1.merge(ih2) - self.assertEquals({}, ih1.todict()) + self.assertEqual({}, ih1.todict()) def test_merge_simple(self): ih1 = IntelHex({0:1, 1:2, 2:3}) ih2 = IntelHex({3:4, 4:5, 5:6}) ih1.merge(ih2) - self.assertEquals({0:1, 1:2, 2:3, 3:4, 4:5, 5:6}, ih1.todict()) + self.assertEqual({0:1, 1:2, 2:3, 3:4, 4:5, 5:6}, ih1.todict()) def test_merge_wrong_args(self): ih1 = IntelHex() @@ -971,29 +1163,29 @@ def test_merge_overlap(self): ih1 = IntelHex({0:1}) ih2 = IntelHex({0:2}) ih1.merge(ih2, overlap='ignore') - self.assertEquals({0:1}, ih1.todict()) + self.assertEqual({0:1}, ih1.todict()) # replace ih1 = IntelHex({0:1}) ih2 = IntelHex({0:2}) ih1.merge(ih2, overlap='replace') - self.assertEquals({0:2}, ih1.todict()) + self.assertEqual({0:2}, ih1.todict()) def test_merge_start_addr(self): # this, None ih1 = IntelHex({'start_addr': {'EIP': 0x12345678}}) ih2 = IntelHex() ih1.merge(ih2) - self.assertEquals({'start_addr': {'EIP': 0x12345678}}, ih1.todict()) + self.assertEqual({'start_addr': {'EIP': 0x12345678}}, ih1.todict()) # None, other ih1 = IntelHex() ih2 = IntelHex({'start_addr': {'EIP': 0x12345678}}) ih1.merge(ih2) - self.assertEquals({'start_addr': {'EIP': 0x12345678}}, ih1.todict()) + self.assertEqual({'start_addr': {'EIP': 0x12345678}}, ih1.todict()) # this == other: no conflict ih1 = IntelHex({'start_addr': {'EIP': 0x12345678}}) ih2 = IntelHex({'start_addr': {'EIP': 0x12345678}}) ih1.merge(ih2) - self.assertEquals({'start_addr': {'EIP': 0x12345678}}, ih1.todict()) + self.assertEqual({'start_addr': {'EIP': 0x12345678}}, ih1.todict()) # this != other: conflict ## overlap=error ih1 = IntelHex({'start_addr': {'EIP': 0x12345678}}) @@ -1005,12 +1197,12 @@ def test_merge_start_addr(self): ih1 = IntelHex({'start_addr': {'EIP': 0x12345678}}) ih2 = IntelHex({'start_addr': {'EIP': 0x87654321}}) ih1.merge(ih2, overlap='ignore') - self.assertEquals({'start_addr': {'EIP': 0x12345678}}, ih1.todict()) + self.assertEqual({'start_addr': {'EIP': 0x12345678}}, ih1.todict()) ## overlap=replace ih1 = IntelHex({'start_addr': {'EIP': 0x12345678}}) ih2 = IntelHex({'start_addr': {'EIP': 0x87654321}}) ih1.merge(ih2, overlap='replace') - self.assertEquals({'start_addr': {'EIP': 0x87654321}}, ih1.todict()) + self.assertEqual({'start_addr': {'EIP': 0x87654321}}, ih1.todict()) class TestIntelHex16bit(TestIntelHexBase): @@ -1029,6 +1221,11 @@ def test_init_from_ih(self): ih = intelhex.IntelHex(self.f) ih16 = intelhex.IntelHex16bit(ih) + def test_default_padding(self): + ih16 = intelhex.IntelHex16bit() + self.assertEqual(0x0FFFF, ih16.padding) + self.assertEqual(0x0FFFF, ih16[0]) + def test_minaddr(self): ih = intelhex.IntelHex16bit(self.f) addr = ih.minaddr() @@ -1088,6 +1285,17 @@ def test_setitem(self): self.assertNotEqual(old, ih[0], "Setting new value to internal buffer failed") + + def test_tobinarray(self): + ih = intelhex.IntelHex16bit() + ih[0] = 0x1234 + ih[1] = 0x5678 + self.assertEqual(array.array('H', [0x1234,0x5678,0xFFFF]), + ih.tobinarray(start=0, end=2)) + # change padding + ih.padding = 0x3FFF + self.assertEqual(array.array('H', [0x1234,0x5678,0x3FFF]), + ih.tobinarray(start=0, end=2)) #/class TestIntelHex16bit @@ -1337,7 +1545,7 @@ class TestHex2Bin(unittest.TestCase): def setUp(self): self.fin = StringIO(hex8) - self.fout = StringIO() + self.fout = BytesIO() def tearDown(self): self.fin.close() @@ -1346,7 +1554,7 @@ def tearDown(self): def test_hex2bin(self): ih = hex2bin(self.fin, self.fout) data = array.array('B', asbytes(self.fout.getvalue())) - for addr in xrange(len(bin8)): + for addr in range_g(len(bin8)): expected = bin8[addr] actual = data[addr] self.assertEqual(expected, actual, @@ -1363,7 +1571,7 @@ def test_simple(self): intelhex.diff_dumps(ih1, ih2, sio) result = sio.getvalue() extra = ' ' - if sys.version_info[0] >= 3: + if sys.version_info[0] >= 3 or sys.version >= '2.7': extra = '' shouldbe = ( "--- a%(extra)s\n" @@ -1374,7 +1582,7 @@ def test_simple(self): "+0010 -- -- -- -- 32 -- -- -- -- -- -- -- -- -- -- -- | 2 |\n" " 0020 -- -- -- -- -- -- -- -- 33 -- -- -- -- -- -- -- | 3 |\n" ) % dict(extra=extra) - self.assertEquals(shouldbe, result) + self.assertEqual(shouldbe, result) class TestBuildRecords(TestIntelHexBase): @@ -1445,6 +1653,146 @@ def test_drive_letter(self): intelhex._get_file_and_addr_range('C:\\filename.hex:0001:000A', True)) +class TestXrangeLongInt(unittest.TestCase): + + def test_xrange_longint(self): + # Bug #1408934: xrange(longint) blows with OverflowError: + if compat.Python == 2: + self.assertRaises(OverflowError, xrange, sys.maxint, sys.maxint+3) + # + upr = compat.range_g(2684625744, 2684625747) + self.assertEqual([2684625744, 2684625745, 2684625746], list(upr)) + upr = compat.range_g(2684625744, 2684625747, 2) + self.assertEqual([2684625744, 2684625746], list(upr)) + # + dnr = compat.range_g(2684625746, 2684625743, -1) + self.assertEqual([2684625746, 2684625745, 2684625744], list(dnr)) + dnr = compat.range_g(2684625746, 2684625743, -2) + self.assertEqual([2684625746, 2684625744], list(dnr)) + + +class TestInSubprocess(unittest.TestCase): + + def runProcessAndGetAsciiStdoutOrStderr(self, cmdline): + if sys.platform != 'win32': + cmdline = shlex.split(cmdline) + p = subprocess.Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = p.communicate() + retcode = p.poll() + if stdout: + output = stdout.decode('ascii', 'replace') + elif stderr: + output = stderr.decode('ascii', 'replace') + output = output.replace('\r', '') + return retcode, output + + def versionChecker(self, cmdline_template): + cmdline = cmdline_template % sys.executable + retcode, output = self.runProcessAndGetAsciiStdoutOrStderr(cmdline) + self.assertEqual(version_str, output.rstrip()) + self.assertEqual(0, retcode) + + def test_setup_version(self): + self.versionChecker('%s setup.py --version') + + def test_sripts_bin2hex_version(self): + self.versionChecker('%s scripts/bin2hex.py --version') + + def test_sripts_hex2bin_version(self): + self.versionChecker('%s scripts/hex2bin.py --version') + + def test_sripts_hex2dump_version(self): + self.versionChecker('%s scripts/hex2dump.py --version') + + def test_sripts_hexdiff_version(self): + self.versionChecker('%s scripts/hexdiff.py --version') + + def test_sripts_hexmerge_version(self): + self.versionChecker('%s scripts/hexmerge.py --version') + + +class TestWriteHexFileByteCount(unittest.TestCase): + + def setUp(self): + self.f = StringIO(hex8) + + def tearDown(self): + self.f.close() + del self.f + + def test_write_hex_file_bad_byte_count(self): + ih = intelhex.IntelHex(self.f) + sio = StringIO() + self.assertRaises(ValueError, ih.write_hex_file, sio, byte_count=0) + self.assertRaises(ValueError, ih.write_hex_file, sio, byte_count=-1) + self.assertRaises(ValueError, ih.write_hex_file, sio, byte_count=256) + + def test_write_hex_file_byte_count_1(self): + ih = intelhex.IntelHex(self.f) + ih1 = ih[:4] + sio = StringIO() + ih1.write_hex_file(sio, byte_count=1) + s = sio.getvalue() + sio.close() + # check that we have all data records with data length == 1 + self.assertEqual(( + ':0100000002FD\n' + ':0100010005F9\n' + ':01000200A25B\n' + ':01000300E517\n' + ':00000001FF\n' + ), s, + "Written hex is not in byte count 1") + # read back and check content + fin = StringIO(s) + ih2 = intelhex.IntelHex(fin) + self.assertEqual(ih1.tobinstr(), ih2.tobinstr(), + "Written hex file does not equal with original") + + def test_write_hex_file_byte_count_13(self): + ih = intelhex.IntelHex(self.f) + sio = StringIO() + ih.write_hex_file(sio, byte_count=13) + s = sio.getvalue() + # control written hex first line to check that byte count is 13 + sio.seek(0) + self.assertEqual(sio.readline(), + ':0D0000000205A2E576246AF8E6057622786E\n', + "Written hex is not in byte count 13") + sio.close() + + fin = StringIO(s) + ih2 = intelhex.IntelHex(fin) + + self.assertEqual(ih.tobinstr(), ih2.tobinstr(), + "Written hex file does not equal with original") + + def test_write_hex_file_byte_count_255(self): + ih = intelhex.IntelHex(self.f) + sio = StringIO() + ih.write_hex_file(sio, byte_count=255) + s = sio.getvalue() + # control written hex first line to check that byte count is 255 + sio.seek(0) + self.assertEqual(sio.readline(), + (':FF0000000205A2E576246AF8E60576227867300702786AE475F0011204AD02' + '04552000EB7F2ED2008018EF540F2490D43440D4FF30040BEF24BFB41A0050' + '032461FFE57760021577057AE57A7002057930070D7867E475F0011204ADEF' + '02049B02057B7403D2078003E4C207F5768B678A688969E4F577F579F57AE5' + '7760077F2012003E80F57578FFC201C200C202C203C205C206C20812000CFF' + '700D3007057F0012004FAF7AAE7922B4255FC2D5C20412000CFF24D0B40A00' + '501A75F00A787730D50508B6FF0106C6A426F620D5047002D20380D924CFB4' + '1A00EF5004C2E5D20402024FD20180C6D20080C0D20280BCD2D580BAD20580' + 'B47F2012003E20020774010E\n'), + "Written hex is not in byte count 255") + sio.close() + + fin = StringIO(s) + ih2 = intelhex.IntelHex(fin) + + self.assertEqual(ih.tobinstr(), ih2.tobinstr(), + "Written hex file does not equal with original") + ## # MAIN if __name__ == '__main__': diff --git a/nesc/whip6/smake/build_step.py b/nesc/whip6/smake/build_step.py index b3662c4..c794125 100644 --- a/nesc/whip6/smake/build_step.py +++ b/nesc/whip6/smake/build_step.py @@ -45,8 +45,8 @@ def __init__(self, project_root, configs, flags): self.build_dir = self.find_config_value(BUILD_DIR) if not self.build_dir: - print colored('Warning: "build dir" not defined. This is ok, ' - 'if build step does not need one.', 'yellow') + print(colored('Warning: "build dir" not defined. This is ok, ' + 'if build step does not need one.', 'yellow')) self.board = flags.pop(-1) self.target_args = flags.pop(-1) @@ -101,7 +101,7 @@ def call(self, *args, **kwargs): if result != 0: raise BuildError('Command "%s" returned %d' % ( truncate(real_command, 100), result)) - except OSError, e: + except OSError as e: raise BuildError('Failed to execute "%s": %s' % ( real_command, e)) @@ -130,4 +130,4 @@ def copy(self, src, trg): def _show_command(self, command): if 'show_commands' in self.flags: - print colored(command, 'yellow') + print(colored(command, 'yellow')) diff --git a/nesc/whip6/smake/config_tags.py b/nesc/whip6/smake/config_tags.py index 9fd3ff6..e02bece 100644 --- a/nesc/whip6/smake/config_tags.py +++ b/nesc/whip6/smake/config_tags.py @@ -37,7 +37,8 @@ def _comp_targets_typecheck(t): if not isinstance(t, dict): return False if t.values(): - if not isinstance(t.values()[0], list): + elem = next(iter(t.values())) + if not isinstance(elem, list): return False return True diff --git a/nesc/whip6/smake/smake b/nesc/whip6/smake/smake index 3c52b51..513c3fd 100755 --- a/nesc/whip6/smake/smake +++ b/nesc/whip6/smake/smake @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # # whip6: Warsaw High-performance IPv6. # diff --git a/nesc/whip6/smake/smake_impl.py b/nesc/whip6/smake/smake_impl.py index c1105e4..d81d142 100644 --- a/nesc/whip6/smake/smake_impl.py +++ b/nesc/whip6/smake/smake_impl.py @@ -90,7 +90,7 @@ args = parser.parse_args() def fail(msg): - print colored(msg, 'red') + print(colored(msg, 'red')) sys.exit(1) class SMake(object): @@ -109,7 +109,7 @@ def run(self): available_board_names = self._available_board_names() if args.list_boards: for board_name in available_board_names: - print board_name + print(board_name) sys.exit(0) if not args.board: @@ -149,7 +149,7 @@ def run(self): # Verify target argument if target not in direct_targets and target not in composite_targets: - print colored('Unknown target. Choose one of:', 'yellow') + print(colored('Unknown target. Choose one of:', 'yellow')) self._show_board_targets(board) sys.exit(1) @@ -165,7 +165,7 @@ def make(new_target): fail('Target loop detected:') for t in target_history: fail(t), - print '' + print('') sys.exit(1) target_history.append(new_target) @@ -200,7 +200,7 @@ def make(new_target): args.flags) try: build_step.run_step() - except BuildError, e: + except BuildError as e: fail('Target "%s" failed - %s' % (new_target, str(e))) else: @@ -208,10 +208,10 @@ def make(new_target): target_history.pop(-1) targets_ready.add(new_target) - print colored('Target %s complete.' % new_target, 'cyan') + print(colored('Target %s complete.' % new_target, 'cyan')) make(target) # Actually do the job - print colored('All targets completed successfully!', 'green') + print(colored('All targets completed successfully!', 'green')) def _available_board_names(self): config = self._read_config(os.getcwd()) @@ -230,16 +230,16 @@ def _show_board_targets(self, brd): max(len(k) for k in direct_targets.keys())) format_str = '%%%ds - %%s' % (padding + 9) for name in composite_targets.keys(): - print format_str % (colored(name, 'cyan'), ', '.join(com_ip[name])) + print(format_str % (colored(name, 'cyan'), ', '.join(com_ip[name]))) for name in direct_targets.keys(): - print format_str % (colored(name, 'cyan'), dir_ip[name]) + print(format_str % (colored(name, 'cyan'), dir_ip[name])) def _show_boards_and_quit(self, msg): - print '%s. Choose one of:' % msg + print('%s. Choose one of:' % msg) brd_names = self._available_board_names() brd_names.sort() for brd in brd_names: - print colored(brd, 'green'), 'with target:' + print(colored(brd, 'green'), 'with target:') self._show_board_targets(brd) sys.exit(1) @@ -255,8 +255,8 @@ def _find_boards(self): if BOARD in config: if config[BOARD] != os.path.basename(dirpath): - print >>sys.stderr, ('Warning: Board name in config should' - ' match its dir name %s' % dirpath) + print(('Warning: Board name in config should' + ' match its dir name %s' % dirpath), file=sys.stderr) if config[BOARD] in boards: raise ValueError('Conflicting board names %s and %s' % (dirpath, boards[config[BOARD]])) @@ -282,7 +282,7 @@ def _read_config(self, path, suffix=None): body = f.read() spec_dir = os.path.dirname(path) body = body.replace('$(%s)' % SPEC_DIR_VAR, spec_dir) - for key, value in self.constants_in_conf.iteritems(): + for key, value in self.constants_in_conf.items(): body = body.replace('$(%s)' % key, value) # Try replacing $VAR statements with env. variables @@ -297,11 +297,11 @@ def _read_config(self, path, suffix=None): def _recursive_config_list(self, search_roots, suffixes=()): visited = set() suffixes = [None] + list(suffixes) - configs = [[] for i in xrange(len(suffixes))] + configs = [[] for i in range(len(suffixes))] def dfs(u): if not os.path.isdir(u): - print colored('Warning: Package %s does not exist' % u, 'yellow') + print(colored('Warning: Package %s does not exist' % u, 'yellow')) try: visited.add(u) @@ -329,13 +329,13 @@ def dfs(u): configs[i].append(config) if args.show_config_paths: - print u + print(u) except: - print 'Error processing: %s' % os.path.join(u, args.conf_name) + print('Error processing: %s' % os.path.join(u, args.conf_name)) raise if args.show_config_paths: - print 'Dependencie paths:' + print('Dependencie paths:') for root in search_roots: if root not in visited: @@ -354,9 +354,9 @@ def _extract_sub_deps(self, config): paths.append(path) found_p = True if not found_p: - print colored("WARNING: Dependency %s not found in config\n%s" % + print(colored("WARNING: Dependency %s not found in config\n%s" % (p, config[CONF_PATH]), - 'yellow') + 'yellow')) for i in range(len(paths)): if paths[i].endswith('/'): paths[i] = paths[i][:-1] @@ -372,18 +372,18 @@ def _load_direct_targets(self, configs): for target_name in config[DIRECT_TARGETS]: py_file_path = os.path.join(config[CONF_PATH], target_name + '.py') if target_name in targets: - print ('Target "%s" ambiguity between %s and %s' % - (target_name, target_impl_paths[target_name], py_file_path)) + print(('Target "%s" ambiguity between %s and %s' % + (target_name, target_impl_paths[target_name], py_file_path))) sys.exit(1) if not os.path.isfile(py_file_path): - print colored('Target "%s" script file %s not found' % - (target_name, py_file_path), 'yellow') + print(colored('Target "%s" script file %s not found' % + (target_name, py_file_path), 'yellow')) continue sys.path.append(config[CONF_PATH]) - module = __import__(target_name, globals(), locals(), [], -1) + module = __import__(target_name, globals(), locals(), [], 0) targets[target_name] = module.BuildStepImpl target_impl_paths[target_name] = py_file_path return targets, target_impl_paths