Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ build/
dist/
ledgerblue.egg-info/
__pycache__
.python-version

__version__.py
19 changes: 19 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v3.2.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.3.7
hooks:
# Run the linter.
- id: ruff
args: [ --fix ]
# Run the formatter.
- id: ruff-format
22 changes: 12 additions & 10 deletions doc/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,37 @@

from ledgerblue.__version__ import __version__


def setup(app):
app.add_css_file('theme_overrides.css') # Override wide tables in RTD theme
app.add_css_file("theme_overrides.css") # Override wide tables in RTD theme


# General Configuration
# =====================

extensions = []

source_suffix = ['.rst']
source_suffix = [".rst"]

master_doc = 'index'
master_doc = "index"

project = u'BOLOS Python Loader'
copyright = u'2017, Ledger Team'
author = u'Ledger Team'
project = "BOLOS Python Loader"
copyright = "2017, Ledger Team"
author = "Ledger Team"

version = __version__
release = __version__

pygments_style = 'sphinx'
pygments_style = "sphinx"

# Options for HTML Output
# =======================

html_theme = 'sphinx_rtd_theme'
html_theme = "sphinx_rtd_theme"

html_static_path = ['_static']
html_static_path = ["_static"]

# sphinxarg
# =========

extensions += ['sphinxarg.ext']
extensions += ["sphinxarg.ext"]
85 changes: 60 additions & 25 deletions ledgerblue/BleComm.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,34 @@

TAG_ID = b"\x05"


def get_argparser():
parser = argparse.ArgumentParser(description="Manage ledger ble devices.")
parser.add_argument("--show", help="Show currently selected ledger device.", action='store_true')
parser.add_argument("--demo", help="Get version demo (connect to ble device, send get version, print response and disconnect).", action='store_true')
parser.add_argument(
"--show", help="Show currently selected ledger device.", action="store_true"
)
parser.add_argument(
"--demo",
help="Get version demo (connect to ble device, send get version, print response and disconnect).",
action="store_true",
)
return parser


class NoLedgerDeviceDetected(Exception):
pass


class BleScanner(object):
def __init__(self):
self.devices = []

def __scan_callback(self, device: BLEDevice, advertisement_data: AdvertisementData):
if LEDGER_SERVICE_UUID_STAX in advertisement_data.service_uuids or LEDGER_SERVICE_UUID_NANOX in advertisement_data.service_uuids or LEDGER_SERVICE_UUID_EUROPA in advertisement_data.service_uuids:
if (
LEDGER_SERVICE_UUID_STAX in advertisement_data.service_uuids
or LEDGER_SERVICE_UUID_NANOX in advertisement_data.service_uuids
or LEDGER_SERVICE_UUID_EUROPA in advertisement_data.service_uuids
):
device_is_in_list = False
for dev in self.devices:
if device.address == dev[0]:
Expand All @@ -45,12 +58,15 @@ async def scan(self):
counter += 1
await scanner.stop()


queue: asyncio.Queue = asyncio.Queue()


def callback(sender, data):
response = bytes(data)
queue.put_nowait(response)


async def _get_client(address: str) -> BleakClient:
# Connect to client
client = BleakClient(address)
Expand All @@ -60,7 +76,11 @@ async def _get_client(address: str) -> BleakClient:
characteristic_write_with_rsp = None
characteristic_write_cmd = None
for service in client.services:
if service.uuid in [LEDGER_SERVICE_UUID_NANOX, LEDGER_SERVICE_UUID_STAX, LEDGER_SERVICE_UUID_EUROPA]:
if service.uuid in [
LEDGER_SERVICE_UUID_NANOX,
LEDGER_SERVICE_UUID_STAX,
LEDGER_SERVICE_UUID_EUROPA,
]:
for char in service.characteristics:
if "0001" in char.uuid:
characteristic_notify = char
Expand All @@ -86,44 +106,45 @@ async def _get_client(address: str) -> BleakClient:
# Get MTU value
await client.write_gatt_char(characteristic_write.uuid, bytes.fromhex("0800000000"))
response = await queue.get()
mtu = int.from_bytes(response[5:6], 'big')
mtu = int.from_bytes(response[5:6], "big")

print("[BLE] MTU {:d}".format(mtu))

return client, mtu, characteristic_write


async def _read(mtu) -> bytes:
response = await queue.get()
assert len(response) >= 5
assert response[0] == TAG_ID[0]
sequence = int.from_bytes(response[1:3], 'big')
sequence = int.from_bytes(response[1:3], "big")
assert sequence == 0
total_size = int.from_bytes(response[3:5], "big")

total_size_bkup = total_size
if total_size >= (mtu-5):
if total_size >= (mtu - 5):
apdu = response[5:mtu]
total_size -= (mtu-5)
total_size -= mtu - 5

response = await queue.get()
assert response[0] == TAG_ID[0]
assert len(response) >= 3
next_sequence = int.from_bytes(response[1:3], 'big')
assert next_sequence == sequence+1
next_sequence = int.from_bytes(response[1:3], "big")
assert next_sequence == sequence + 1
sequence = next_sequence

while total_size >= (mtu-3):
while total_size >= (mtu - 3):
apdu += response[3:mtu]
total_size -= (mtu-3)
total_size -= mtu - 3
response = await queue.get()
assert response[0] == TAG_ID[0]
assert len(response) >= 3
sequence = int.from_bytes(response[1:3], 'big')
assert next_sequence == sequence+1
sequence = int.from_bytes(response[1:3], "big")
assert next_sequence == sequence + 1
sequence = next_sequence

if total_size > 0:
apdu += response[3:3+total_size]
apdu += response[3 : 3 + total_size]
else:
apdu = response[5:]

Expand Down Expand Up @@ -162,7 +183,9 @@ def __init__(self, address):

def open(self):
self.loop = asyncio.new_event_loop()
self.client, self.mtu, self.write_characteristic = self.loop.run_until_complete(_get_client(self.address))
self.client, self.mtu, self.write_characteristic = self.loop.run_until_complete(
_get_client(self.address)
)
self.opened = True

def close(self):
Expand All @@ -172,7 +195,9 @@ def close(self):
self.loop.close()

def __write(self, data: bytes):
self.loop.run_until_complete(_write(self.client, data, self.write_characteristic, self.mtu))
self.loop.run_until_complete(
_write(self.client, data, self.write_characteristic, self.mtu)
)

def __read(self) -> bytes:
return self.loop.run_until_complete(_read(self.mtu))
Expand All @@ -181,18 +206,25 @@ def exchange(self, data: bytes, timeout=1000) -> bytes:
self.__write(data)
return self.__read()


if __name__ == "__main__":
args = get_argparser().parse_args()
try:
if args.show:
print(f"Environment variable LEDGER_BLE_MAC currently set to '{os.environ['LEDGER_BLE_MAC']}'")
print(
f"Environment variable LEDGER_BLE_MAC currently set to '{os.environ['LEDGER_BLE_MAC']}'"
)
elif args.demo:
try:
ble_device = BleDevice(os.environ['LEDGER_BLE_MAC'])
ble_device = BleDevice(os.environ["LEDGER_BLE_MAC"])
except Exception as ex:
print(f"Please run 'python -m ledgerblue.BleComm' to select wich device to connect to")
print(
f"Please run 'python -m ledgerblue.BleComm' to select wich device to connect to"
)
raise ex
print("-----------------------------Get version BLE demo------------------------------")
print(
"-----------------------------Get version BLE demo------------------------------"
)
ble_device.open()
print(f"Connected to {ble_device.address}")
get_version_apdu = bytes.fromhex("e001000000")
Expand All @@ -201,22 +233,25 @@ def exchange(self, data: bytes, timeout=1000) -> bytes:
print(f"[BLE] <= {result.hex()}")
ble_device.close()
print(f"Disconnected from {ble_device.address}")
print(79*"-")
print(79 * "-")
else:
scanner = BleScanner()
asyncio.run(scanner.scan())
devices_str = ""
device_idx = 0
if len(scanner.devices):
for device in scanner.devices:
devices_str += f" -{device_idx+1}- mac=\"{device[0]}\" name=\"{device[1]}\"\n"
devices_str += (
f' -{device_idx + 1}- mac="{device[0]}" name="{device[1]}"\n'
)
device_idx += 1
index = int(input(f"Select device by index\n{devices_str}"))
print(f"Please run 'export LEDGER_BLE_MAC=\"{scanner.devices[index-1][0]}\"' to select which device to connect to")
print(
f"Please run 'export LEDGER_BLE_MAC=\"{scanner.devices[index - 1][0]}\"' to select which device to connect to"
)
else:
raise NoLedgerDeviceDetected
except NoLedgerDeviceDetected as ex:
print(ex)
except Exception as ex:
raise ex

Loading
Loading