|
14 | 14 | """BitBox02"""
|
15 | 15 |
|
16 | 16 | from abc import ABC, abstractmethod
|
| 17 | +from dataclasses import dataclass |
17 | 18 | import os
|
18 | 19 | import enum
|
19 | 20 | import sys
|
|
36 | 37 | try:
|
37 | 38 | from .generated import hww_pb2 as hww
|
38 | 39 | from .generated import system_pb2 as system
|
| 40 | + from .generated import keystore_pb2 as keystore |
39 | 41 | except ModuleNotFoundError:
|
40 | 42 | print("Run `make py` to generate the protobuf messages")
|
41 | 43 | sys.exit()
|
@@ -265,6 +267,15 @@ def set_app_static_privkey(self, privkey: bytes) -> None:
|
265 | 267 | pass
|
266 | 268 |
|
267 | 269 |
|
| 270 | +@dataclass |
| 271 | +class BitBoxConfig: |
| 272 | + """Configuration options""" |
| 273 | + |
| 274 | + # If defined, this is called to enter the mnemonic passphrase on the host. |
| 275 | + # It should return the passphrase, or None if the operation was cancelled. |
| 276 | + enter_mnemonic_passphrase: Optional[Callable[[], Optional[str]]] = None |
| 277 | + |
| 278 | + |
268 | 279 | class BitBoxProtocol(ABC):
|
269 | 280 | """
|
270 | 281 | Class for executing versioned BitBox operations
|
@@ -525,12 +536,13 @@ def cancel_outstanding_request(self) -> None:
|
525 | 536 | class BitBoxCommonAPI:
|
526 | 537 | """Class to communicate with a BitBox device"""
|
527 | 538 |
|
528 |
| - # pylint: disable=too-many-public-methods,too-many-arguments |
| 539 | + # pylint: disable=too-many-public-methods,too-many-arguments,too-many-branches |
529 | 540 | def __init__(
|
530 | 541 | self,
|
531 | 542 | transport: TransportLayer,
|
532 | 543 | device_info: Optional[DeviceInfo],
|
533 | 544 | noise_config: BitBoxNoiseConfig,
|
| 545 | + config: BitBoxConfig = BitBoxConfig(), |
534 | 546 | ):
|
535 | 547 | """
|
536 | 548 | Can raise LibraryVersionOutdatedException. check_min_version() should be called following
|
@@ -577,9 +589,13 @@ def __init__(
|
577 | 589 |
|
578 | 590 | if self.version >= semver.VersionInfo(2, 0, 0):
|
579 | 591 | noise_config.attestation_check(self._perform_attestation())
|
580 |
| - self._bitbox_protocol.unlock_query() |
| 592 | + # Starting with v9.23, we can use the unlock function below after noise pairing. |
| 593 | + if self.version < semver.VersionInfo(9, 23, 0): |
| 594 | + self._bitbox_protocol.unlock_query() |
581 | 595 |
|
582 | 596 | self._bitbox_protocol.noise_connect(noise_config)
|
| 597 | + if self.version >= semver.VersionInfo(9, 23, 0): |
| 598 | + self.unlock(config) |
583 | 599 |
|
584 | 600 | # pylint: disable=too-many-return-statements
|
585 | 601 | def _perform_attestation(self) -> bool:
|
@@ -658,6 +674,47 @@ def _msg_query(
|
658 | 674 | print(response)
|
659 | 675 | return response
|
660 | 676 |
|
| 677 | + def unlock( |
| 678 | + self, |
| 679 | + config: BitBoxConfig, |
| 680 | + ) -> None: |
| 681 | + """ |
| 682 | + Prompt to unlock the device. If already unlocked, nothing happens. If |
| 683 | + `config.enter_mnemonic_passphrase` is defined and the user chooses to enter the passphrase |
| 684 | + on the host, this callback will be called to retrieve the passphrase. |
| 685 | + """ |
| 686 | + # pylint: disable=no-member |
| 687 | + try: |
| 688 | + request = hww.Request() |
| 689 | + request.unlock.CopyFrom( |
| 690 | + keystore.UnlockRequest( |
| 691 | + supports_host_passphrase=config.enter_mnemonic_passphrase is not None, |
| 692 | + ) |
| 693 | + ) |
| 694 | + |
| 695 | + while True: |
| 696 | + response = self._msg_query(request) |
| 697 | + response_type = response.WhichOneof("response") |
| 698 | + |
| 699 | + if ( |
| 700 | + response_type == "unlock_host_info" |
| 701 | + and response.unlock_host_info.type |
| 702 | + == keystore.UnlockRequestHostInfoResponse.InfoType.PASSPHRASE |
| 703 | + ): |
| 704 | + assert config.enter_mnemonic_passphrase is not None |
| 705 | + request = hww.Request() |
| 706 | + request.unlock_host_info.CopyFrom( |
| 707 | + keystore.UnlockHostInfoRequest( |
| 708 | + passphrase=config.enter_mnemonic_passphrase(), |
| 709 | + ) |
| 710 | + ) |
| 711 | + elif response_type == "success": |
| 712 | + break |
| 713 | + else: |
| 714 | + raise Exception("Unexpected response") |
| 715 | + except OSError: |
| 716 | + pass |
| 717 | + |
661 | 718 | def reboot(
|
662 | 719 | self, purpose: "system.RebootRequest.Purpose.V" = system.RebootRequest.Purpose.UPGRADE
|
663 | 720 | ) -> bool:
|
|
0 commit comments