diff --git a/chia/_tests/wallet/rpc/test_wallet_rpc.py b/chia/_tests/wallet/rpc/test_wallet_rpc.py index 4baeef7b9ea2..61f92abca7c2 100644 --- a/chia/_tests/wallet/rpc/test_wallet_rpc.py +++ b/chia/_tests/wallet/rpc/test_wallet_rpc.py @@ -2302,8 +2302,6 @@ async def have_nfts() -> bool: # Test with the hex version of nft_id nft_id = (await nft_wallet.get_current_nfts())[0].coin.name().hex() - with pytest.raises(ResponseFailureError, match="Invalid Coin ID format for 'coin_id'"): - await wallet_1_rpc.get_nft_info(NFTGetInfo("error")) nft_info = (await wallet_1_rpc.get_nft_info(NFTGetInfo(nft_id))).nft_info assert nft_info.nft_coin_id == (await nft_wallet.get_current_nfts())[0].coin.name() # Test with the bech32m version of nft_id diff --git a/chia/wallet/wallet_rpc_api.py b/chia/wallet/wallet_rpc_api.py index abc8543dab72..0213c2ffda9b 100644 --- a/chia/wallet/wallet_rpc_api.py +++ b/chia/wallet/wallet_rpc_api.py @@ -28,7 +28,6 @@ from chia.protocols.outbound_message import NodeType from chia.rpc.rpc_server import Endpoint, EndpointResult, default_get_connections from chia.rpc.util import ALL_TRANSLATION_LAYERS, RpcEndpoint, marshal -from chia.server.ws_connection import WSChiaConnection from chia.types.blockchain_format.program import Program from chia.util.bech32m import decode_puzzle_hash, encode_puzzle_hash from chia.util.config import load_config @@ -55,28 +54,16 @@ master_sk_to_pool_sk, match_address_to_sk, ) -from chia.wallet.did_wallet import did_wallet_puzzles -from chia.wallet.did_wallet.did_info import DIDCoinData, DIDInfo, did_recovery_is_nil from chia.wallet.did_wallet.did_wallet import DIDWallet -from chia.wallet.did_wallet.did_wallet_puzzles import ( - DID_INNERPUZ_MOD, - did_program_to_metadata, - match_did_puzzle, - metadata_to_program, -) from chia.wallet.nft_wallet import nft_puzzle_utils -from chia.wallet.nft_wallet.nft_info import NFTCoinInfo, NFTInfo -from chia.wallet.nft_wallet.nft_puzzle_utils import get_metadata_and_phs +from chia.wallet.nft_wallet.nft_info import NFTCoinInfo from chia.wallet.nft_wallet.nft_wallet import NFTWallet -from chia.wallet.nft_wallet.uncurry_nft import UncurriedNFT from chia.wallet.outer_puzzles import AssetType from chia.wallet.puzzle_drivers import PuzzleInfo from chia.wallet.puzzles.clawback.metadata import AutoClaimSettings from chia.wallet.signer_protocol import SigningResponse from chia.wallet.singleton import ( SINGLETON_LAUNCHER_PUZZLE_HASH, - create_singleton_puzzle, - get_inner_puzzle_from_singleton, ) from chia.wallet.trade_record import TradeRecord from chia.wallet.trading.offer import Offer, OfferSummary @@ -84,9 +71,7 @@ from chia.wallet.uncurried_puzzle import uncurry_puzzle from chia.wallet.util.address_type import AddressType, is_valid_address from chia.wallet.util.clvm_streamable import json_serialize_with_clvm_streamable -from chia.wallet.util.compute_hints import compute_spend_hints_and_additions from chia.wallet.util.compute_memos import compute_memos -from chia.wallet.util.curry_and_treehash import NIL_TREEHASH from chia.wallet.util.query_filter import HashFilter from chia.wallet.util.signing import sign_message, verify_signature from chia.wallet.util.transaction_type import CLAWBACK_INCOMING_TRANSACTION_TYPES, TransactionType @@ -743,40 +728,6 @@ async def _convert_tx_puzzle_hash(self, tx: TransactionRecord) -> TransactionRec ), ) - async def get_latest_singleton_coin_spend( - self, peer: WSChiaConnection, coin_id: bytes32, latest: bool = True - ) -> tuple[CoinSpend, CoinState]: - coin_state_list: list[CoinState] = await self.service.wallet_state_manager.wallet_node.get_coin_state( - [coin_id], peer=peer - ) - if coin_state_list is None or len(coin_state_list) < 1: - raise ValueError(f"Coin record 0x{coin_id.hex()} not found") - coin_state: CoinState = coin_state_list[0] - if latest: - # Find the unspent coin - while coin_state.spent_height is not None: - coin_state_list = await self.service.wallet_state_manager.wallet_node.fetch_children( - coin_state.coin.name(), peer=peer - ) - odd_coin = None - for coin in coin_state_list: - if coin.coin.amount % 2 == 1: - if odd_coin is not None: - raise ValueError("This is not a singleton, multiple children coins found.") - odd_coin = coin - if odd_coin is None: - raise ValueError("Cannot find child coin, please wait then retry.") - coin_state = odd_coin - # Get parent coin - parent_coin_state_list: list[CoinState] = await self.service.wallet_state_manager.wallet_node.get_coin_state( - [coin_state.coin.parent_coin_info], peer=peer - ) - if parent_coin_state_list is None or len(parent_coin_state_list) < 1: - raise ValueError(f"Parent coin record 0x{coin_state.coin.parent_coin_info.hex()} not found") - parent_coin_state: CoinState = parent_coin_state_list[0] - coin_spend = await fetch_coin_spend_for_coin_state(parent_coin_state, peer) - return coin_spend, coin_state - ########################################################################################## # Key management ########################################################################################## @@ -2284,36 +2235,21 @@ async def did_get_info(self, request: DIDGetInfo) -> DIDGetInfoResponse: coin_id = decode_puzzle_hash(request.coin_id) else: coin_id = bytes32.from_hexstr(request.coin_id) - # Get coin state - peer = self.service.get_full_node_peer() - coin_spend, coin_state = await self.get_latest_singleton_coin_spend(peer, coin_id, request.latest) - uncurried = uncurry_puzzle(coin_spend.puzzle_reveal) - curried_args = match_did_puzzle(uncurried.mod, uncurried.args) - if curried_args is None: - raise ValueError("The coin is not a DID.") - p2_puzzle, recovery_list_hash, num_verification, singleton_struct, metadata = curried_args - recovery_list_hash_bytes = recovery_list_hash.as_atom() - launcher_id = bytes32(singleton_struct.rest().first().as_atom()) - uncurried_p2 = uncurry_puzzle(p2_puzzle) - (public_key,) = uncurried_p2.args.as_iter() - memos = compute_memos(WalletSpendBundle([coin_spend], G2Element())) - hints = [] - coin_memos = memos.get(coin_state.coin.name()) - if coin_memos is not None: - for memo in coin_memos: - hints.append(memo) + + search_results = await self.service.wallet_state_manager.manual_did_search(coin_id, request.latest) + return DIDGetInfoResponse( - did_id=encode_puzzle_hash(launcher_id, AddressType.DID.hrp(self.service.config)), - latest_coin=coin_state.coin.name(), - p2_address=encode_puzzle_hash(p2_puzzle.get_tree_hash(), AddressType.XCH.hrp(self.service.config)), - public_key=public_key.as_atom(), - recovery_list_hash=bytes32(recovery_list_hash_bytes) if recovery_list_hash_bytes != b"" else None, - num_verification=uint16(num_verification.as_int()), - metadata=did_program_to_metadata(metadata), - launcher_id=launcher_id, - full_puzzle=Program.from_serialized(coin_spend.puzzle_reveal), - solution=Program.from_serialized(coin_spend.solution), - hints=hints, + did_id=encode_puzzle_hash(search_results.launcher_id, AddressType.DID.hrp(self.service.config)), + latest_coin=search_results.latest_coin, + p2_address=encode_puzzle_hash(search_results.p2_puzzle_hash, AddressType.XCH.hrp(self.service.config)), + public_key=bytes(search_results.public_key), + recovery_list_hash=search_results.recovery_list_hash, + num_verification=search_results.num_verification, + metadata=search_results.metadata, + launcher_id=search_results.launcher_id, + full_puzzle=search_results.full_puzzle, + solution=search_results.solution, + hints=search_results.hints, ) @marshal @@ -2329,191 +2265,15 @@ async def did_find_lost_did(self, request: DIDFindLostDID) -> DIDFindLostDIDResp coin_id = decode_puzzle_hash(request.coin_id) else: coin_id = bytes32.from_hexstr(request.coin_id) - # Get coin state - peer = self.service.get_full_node_peer() - coin_spend, coin_state = await self.get_latest_singleton_coin_spend(peer, coin_id) - uncurried = uncurry_puzzle(coin_spend.puzzle_reveal) - curried_args = match_did_puzzle(uncurried.mod, uncurried.args) - if curried_args is None: - raise ValueError("The coin is not a DID.") - p2_puzzle, recovery_list_hash, num_verification, singleton_struct, metadata = curried_args - num_verification_int: uint16 | None = uint16(num_verification.as_int()) - assert num_verification_int is not None - did_data: DIDCoinData = DIDCoinData( - p2_puzzle, - bytes32(recovery_list_hash.as_atom()) if recovery_list_hash != Program.NIL else None, - num_verification_int, - singleton_struct, - metadata, - get_inner_puzzle_from_singleton(coin_spend.puzzle_reveal), - coin_state, - ) - hinted_coins, _ = compute_spend_hints_and_additions(coin_spend) - # Hint is required, if it doesn't have any hint then it should be invalid - hint: bytes32 | None = None - for hinted_coin in hinted_coins.values(): - if hinted_coin.coin.amount % 2 == 1 and hinted_coin.hint is not None: - hint = hinted_coin.hint - break - derivation_record = None - if hint is not None: - derivation_record = ( - await self.service.wallet_state_manager.puzzle_store.get_derivation_record_for_puzzle_hash(hint) - ) - if derivation_record is None: - # This is an invalid DID, check if we are owner - derivation_record = ( - await self.service.wallet_state_manager.puzzle_store.get_derivation_record_for_puzzle_hash( - p2_puzzle.get_tree_hash() - ) - ) - - launcher_id = bytes32(singleton_struct.rest().first().as_atom()) - if derivation_record is None: - raise ValueError(f"This DID {launcher_id} does not belong to the connected wallet") - else: - our_inner_puzzle: Program = self.service.wallet_state_manager.main_wallet.puzzle_for_pk( - derivation_record.pubkey - ) - did_puzzle = DID_INNERPUZ_MOD.curry( - our_inner_puzzle, recovery_list_hash, num_verification, singleton_struct, metadata - ) - full_puzzle = create_singleton_puzzle(did_puzzle, launcher_id) - did_puzzle_empty_recovery = DID_INNERPUZ_MOD.curry( - our_inner_puzzle, NIL_TREEHASH, uint64(0), singleton_struct, metadata - ) - # Check if we have the DID wallet - did_wallet: DIDWallet | None = None - for wallet in self.service.wallet_state_manager.wallets.values(): - if isinstance(wallet, DIDWallet): - assert wallet.did_info.origin_coin is not None - if wallet.did_info.origin_coin.name() == launcher_id: - did_wallet = wallet - break - - full_puzzle_empty_recovery = create_singleton_puzzle(did_puzzle_empty_recovery, launcher_id) - if full_puzzle.get_tree_hash() != coin_state.coin.puzzle_hash: - # It's unclear whether this path is ever reached, and there is no coverage in the DID wallet tests - if full_puzzle_empty_recovery.get_tree_hash() == coin_state.coin.puzzle_hash: - did_puzzle = did_puzzle_empty_recovery - elif ( - did_wallet is not None - and did_wallet.did_info.current_inner is not None - and create_singleton_puzzle(did_wallet.did_info.current_inner, launcher_id).get_tree_hash() - == coin_state.coin.puzzle_hash - ): - # Check if the old wallet has the inner puzzle - did_puzzle = did_wallet.did_info.current_inner - else: - # Try override - if request.recovery_list_hash is not None: - recovery_list_hash = Program.from_bytes(request.recovery_list_hash) - if request.num_verification is not None: - num_verification_int = request.num_verification - if request.metadata is not None: - metadata = metadata_to_program(request.metadata) - did_puzzle = DID_INNERPUZ_MOD.curry( - our_inner_puzzle, recovery_list_hash, num_verification, singleton_struct, metadata - ) - full_puzzle = create_singleton_puzzle(did_puzzle, launcher_id) - matched = True - if full_puzzle.get_tree_hash() != coin_state.coin.puzzle_hash: - matched = False - # Brute force addresses - index = 0 - derivation_record = await self.service.wallet_state_manager.puzzle_store.get_derivation_record( - uint32(index), uint32(1), False - ) - while derivation_record is not None: - our_inner_puzzle = self.service.wallet_state_manager.main_wallet.puzzle_for_pk( - derivation_record.pubkey - ) - did_puzzle = DID_INNERPUZ_MOD.curry( - our_inner_puzzle, recovery_list_hash, num_verification, singleton_struct, metadata - ) - full_puzzle = create_singleton_puzzle(did_puzzle, launcher_id) - if full_puzzle.get_tree_hash() == coin_state.coin.puzzle_hash: - matched = True - break - index += 1 - derivation_record = ( - await self.service.wallet_state_manager.puzzle_store.get_derivation_record( - uint32(index), uint32(1), False - ) - ) - if not matched: - raise RuntimeError( - f"Cannot recover DID {launcher_id} " - f"because the last spend updated recovery_list_hash/num_verification/metadata." - ) - - if did_wallet is None: - # Create DID wallet - response: list[CoinState] = await self.service.get_coin_state([launcher_id], peer=peer) - if len(response) == 0: - raise ValueError(f"Could not find the launch coin with ID: {launcher_id}") - launcher_coin: CoinState = response[0] - did_wallet = await DIDWallet.create_new_did_wallet_from_coin_spend( - self.service.wallet_state_manager, - self.service.wallet_state_manager.main_wallet, - launcher_coin.coin, - did_puzzle, - coin_spend, - f"DID {encode_puzzle_hash(launcher_id, AddressType.DID.hrp(self.service.config))}", - ) - else: - assert did_wallet.did_info.current_inner is not None - if did_wallet.did_info.current_inner.get_tree_hash() != did_puzzle.get_tree_hash(): - # Inner DID puzzle doesn't match, we need to update the DID info - full_solution: Program = Program.from_bytes(bytes(coin_spend.solution)) - inner_solution: Program = full_solution.rest().rest().first() - recovery_list: list[bytes32] = [] - backup_required: int = num_verification.as_int() - if not did_recovery_is_nil(recovery_list_hash): - try: - for did in inner_solution.rest().rest().rest().rest().rest().as_python(): - recovery_list.append(did[0]) - except Exception: - # We cannot recover the recovery list, but it's okay to leave it blank - pass - did_info: DIDInfo = DIDInfo( - did_wallet.did_info.origin_coin, - recovery_list, - uint64(backup_required), - [], - did_puzzle, - None, - None, - None, - False, - json.dumps(did_wallet_puzzles.did_program_to_metadata(metadata)), - ) - await did_wallet.save_info(did_info) - await self.service.wallet_state_manager.update_wallet_puzzle_hashes(did_wallet.wallet_info.id) + await self.service.wallet_state_manager.find_lost_did( + coin_id=coin_id, + override_recovery_list_hash=request.recovery_list_hash, + override_num_verification=request.num_verification, + override_metadata=request.metadata, + ) - try: - coin = await did_wallet.get_coin() - if coin.name() == coin_state.coin.name(): - return DIDFindLostDIDResponse(coin.name()) - except RuntimeError: - # We don't have any coin for this wallet, add the coin - pass - - wallet_id = did_wallet.id() - wallet_type = did_wallet.type() - assert coin_state.created_height is not None - coin_record: WalletCoinRecord = WalletCoinRecord( - coin_state.coin, uint32(coin_state.created_height), uint32(0), False, False, wallet_type, wallet_id - ) - await self.service.wallet_state_manager.coin_store.add_coin_record(coin_record, coin_state.coin.name()) - await did_wallet.coin_added( - coin_state.coin, - uint32(coin_state.created_height), - peer, - did_data, - ) - return DIDFindLostDIDResponse(coin_state.coin.name()) + return DIDFindLostDIDResponse(coin_id) @tx_endpoint(push=True) @marshal @@ -2962,60 +2722,12 @@ async def nft_get_info(self, request: NFTGetInfo) -> NFTGetInfoResponse: if request.coin_id.startswith(AddressType.NFT.hrp(self.service.config)): coin_id = decode_puzzle_hash(request.coin_id) else: - try: - coin_id = bytes32.from_hexstr(request.coin_id) - except ValueError: - raise ValueError(f"Invalid Coin ID format for 'coin_id': {request.coin_id!r}") - # Get coin state - peer = self.service.get_full_node_peer() - coin_spend, coin_state = await self.get_latest_singleton_coin_spend(peer, coin_id, request.latest) - # convert to NFTInfo - # Check if the metadata is updated - full_puzzle: Program = Program.from_bytes(bytes(coin_spend.puzzle_reveal)) - - uncurried_nft: UncurriedNFT | None = UncurriedNFT.uncurry(*full_puzzle.uncurry()) - if uncurried_nft is None: - raise ValueError("The coin is not a NFT.") - metadata, p2_puzzle_hash = get_metadata_and_phs(uncurried_nft, coin_spend.solution) - # Note: This is not the actual unspent NFT full puzzle. - # There is no way to rebuild the full puzzle in a different wallet. - # But it shouldn't have impact on generating the NFTInfo, since inner_puzzle is not used there. - if uncurried_nft.supports_did: - inner_puzzle = nft_puzzle_utils.recurry_nft_puzzle( - uncurried_nft, Program.from_serialized(coin_spend.solution), uncurried_nft.p2_puzzle - ) - else: - inner_puzzle = uncurried_nft.p2_puzzle + coin_id = bytes32.from_hexstr(request.coin_id) + + search_results = await self.service.wallet_state_manager.manual_nft_search(coin_id, request.latest) - full_puzzle = nft_puzzle_utils.create_full_puzzle( - uncurried_nft.singleton_launcher_id, - metadata, - bytes32(uncurried_nft.metadata_updater_hash.as_atom()), - inner_puzzle, - ) - - # Get launcher coin - launcher_coin: list[CoinState] = await self.service.wallet_state_manager.wallet_node.get_coin_state( - [uncurried_nft.singleton_launcher_id], peer=peer - ) - if launcher_coin is None or len(launcher_coin) < 1 or launcher_coin[0].spent_height is None: - raise ValueError(f"Launcher coin record 0x{uncurried_nft.singleton_launcher_id.hex()} not found") - minter_did = await self.service.wallet_state_manager.get_minter_did(launcher_coin[0].coin, peer) - - nft_info: NFTInfo = await nft_puzzle_utils.get_nft_info_from_puzzle( - NFTCoinInfo( - uncurried_nft.singleton_launcher_id, - coin_state.coin, - None, - full_puzzle, - uint32(launcher_coin[0].spent_height), - minter_did, - uint32(coin_state.created_height) if coin_state.created_height else uint32(0), - ), - self.service.wallet_state_manager.config, - ) # This is a bit hacky, it should just come out like this, but this works for this RPC - nft_info = dataclasses.replace(nft_info, p2_address=p2_puzzle_hash) + nft_info = dataclasses.replace(search_results.nft_info, p2_address=search_results.next_p2_puzzle_hash) return NFTGetInfoResponse(nft_info) @tx_endpoint(push=True) diff --git a/chia/wallet/wallet_state_manager.py b/chia/wallet/wallet_state_manager.py index ea13dc93ca01..0f1192ca3b7a 100644 --- a/chia/wallet/wallet_state_manager.py +++ b/chia/wallet/wallet_state_manager.py @@ -3,6 +3,7 @@ import asyncio import contextlib import dataclasses +import json import logging import multiprocessing.context import time @@ -69,10 +70,17 @@ master_sk_to_wallet_sk_intermediate, master_sk_to_wallet_sk_unhardened, ) -from chia.wallet.did_wallet.did_info import DIDCoinData +from chia.wallet.did_wallet.did_info import DIDCoinData, DIDInfo, did_recovery_is_nil from chia.wallet.did_wallet.did_wallet import DIDWallet -from chia.wallet.did_wallet.did_wallet_puzzles import DID_INNERPUZ_MOD, match_did_puzzle +from chia.wallet.did_wallet.did_wallet_puzzles import ( + DID_INNERPUZ_MOD, + did_program_to_metadata, + match_did_puzzle, + metadata_to_program, +) from chia.wallet.key_val_store import KeyValStore +from chia.wallet.nft_wallet import nft_puzzle_utils +from chia.wallet.nft_wallet.nft_info import NFTCoinInfo, NFTInfo from chia.wallet.nft_wallet.nft_puzzle_utils import get_metadata_and_phs, get_new_owner_did from chia.wallet.nft_wallet.nft_wallet import NFTWallet from chia.wallet.nft_wallet.uncurry_nft import NFTCoinData, UncurriedNFT @@ -3006,3 +3014,319 @@ def new_pool_wallet_pubkey(self) -> G1Element: raise ValueError(f"Too many pool wallets ({max_pwi}), cannot create any more on this key.") return master_sk_to_singleton_owner_sk(self.get_master_private_key(), uint32(max_pwi)).get_g1() + + async def get_latest_singleton_coin_spend( + self, peer: WSChiaConnection, coin_id: bytes32, latest: bool = True + ) -> tuple[CoinSpend, CoinState]: + coin_state_list: list[CoinState] = await self.wallet_node.get_coin_state([coin_id], peer=peer) + if coin_state_list is None or len(coin_state_list) < 1: + raise ValueError(f"Coin record 0x{coin_id.hex()} not found") + coin_state: CoinState = coin_state_list[0] + if latest: + # Find the unspent coin + while coin_state.spent_height is not None: + coin_state_list = await self.wallet_node.fetch_children(coin_state.coin.name(), peer=peer) + odd_coin = None + for coin in coin_state_list: + if coin.coin.amount % 2 == 1: + if odd_coin is not None: + raise ValueError("This is not a singleton, multiple children coins found.") + odd_coin = coin + if odd_coin is None: + raise ValueError("Cannot find child coin, please wait then retry.") + coin_state = odd_coin + # Get parent coin + parent_coin_state_list: list[CoinState] = await self.wallet_node.get_coin_state( + [coin_state.coin.parent_coin_info], peer=peer + ) + if parent_coin_state_list is None or len(parent_coin_state_list) < 1: + raise ValueError(f"Parent coin record 0x{coin_state.coin.parent_coin_info.hex()} not found") + parent_coin_state: CoinState = parent_coin_state_list[0] + coin_spend = await fetch_coin_spend_for_coin_state(parent_coin_state, peer) + return coin_spend, coin_state + + async def manual_did_search(self, coin_id: bytes32, latest: bool = True) -> ManualDIDSearchResults: + peer = self.wallet_node.get_full_node_peer() + coin_spend, coin_state = await self.get_latest_singleton_coin_spend(peer, coin_id, latest) + uncurried = uncurry_puzzle(coin_spend.puzzle_reveal) + curried_args = match_did_puzzle(uncurried.mod, uncurried.args) + if curried_args is None: + raise ValueError("The coin is not a DID.") + p2_puzzle, recovery_list_hash, num_verification, singleton_struct, metadata = curried_args + recovery_list_hash_bytes = recovery_list_hash.as_atom() + launcher_id = bytes32(singleton_struct.rest().first().as_atom()) + uncurried_p2 = uncurry_puzzle(p2_puzzle) + (public_key,) = uncurried_p2.args.as_iter() + memos = compute_memos(WalletSpendBundle([coin_spend], G2Element())) + hints = [] + coin_memos = memos.get(coin_state.coin.name()) + if coin_memos is not None: + for memo in coin_memos: + hints.append(memo) + + return ManualDIDSearchResults( + launcher_id=launcher_id, + latest_coin=coin_state.coin.name(), + p2_puzzle_hash=p2_puzzle.get_tree_hash(), + public_key=G1Element.from_bytes(public_key.as_atom()), + recovery_list_hash=bytes32(recovery_list_hash_bytes) if recovery_list_hash_bytes != b"" else None, + num_verification=uint16(num_verification.as_int()), + metadata=did_program_to_metadata(metadata), + full_puzzle=Program.from_serialized(coin_spend.puzzle_reveal), + solution=Program.from_serialized(coin_spend.solution), + hints=hints, + ) + + async def manual_nft_search(self, coin_id: bytes32, latest: bool = True) -> ManualNFTSearchResults: + # Get coin state + peer = self.wallet_node.get_full_node_peer() + coin_spend, coin_state = await self.get_latest_singleton_coin_spend(peer, coin_id, latest) + # convert to NFTInfo + # Check if the metadata is updated + full_puzzle: Program = Program.from_bytes(bytes(coin_spend.puzzle_reveal)) + + uncurried_nft: UncurriedNFT | None = UncurriedNFT.uncurry(*full_puzzle.uncurry()) + if uncurried_nft is None: + raise ValueError("The coin is not a NFT.") + metadata, p2_puzzle_hash = get_metadata_and_phs(uncurried_nft, coin_spend.solution) + # Note: This is not the actual unspent NFT full puzzle. + # There is no way to rebuild the full puzzle in a different wallet. + # But it shouldn't have impact on generating the NFTInfo, since inner_puzzle is not used there. + if uncurried_nft.supports_did: + inner_puzzle = nft_puzzle_utils.recurry_nft_puzzle( + uncurried_nft, Program.from_serialized(coin_spend.solution), uncurried_nft.p2_puzzle + ) + else: + inner_puzzle = uncurried_nft.p2_puzzle + + full_puzzle = nft_puzzle_utils.create_full_puzzle( + uncurried_nft.singleton_launcher_id, + metadata, + bytes32(uncurried_nft.metadata_updater_hash.as_atom()), + inner_puzzle, + ) + + # Get launcher coin + launcher_coin: list[CoinState] = await self.wallet_node.get_coin_state( + [uncurried_nft.singleton_launcher_id], peer=peer + ) + if launcher_coin is None or len(launcher_coin) < 1 or launcher_coin[0].spent_height is None: + raise ValueError(f"Launcher coin record 0x{uncurried_nft.singleton_launcher_id.hex()} not found") + minter_did = await self.get_minter_did(launcher_coin[0].coin, peer) + + return ManualNFTSearchResults( + nft_info=await nft_puzzle_utils.get_nft_info_from_puzzle( + NFTCoinInfo( + uncurried_nft.singleton_launcher_id, + coin_state.coin, + None, + full_puzzle, + uint32(launcher_coin[0].spent_height), + minter_did, + uint32(coin_state.created_height) if coin_state.created_height else uint32(0), + ), + self.config, + ), + next_p2_puzzle_hash=p2_puzzle_hash, + ) + + async def find_lost_did( + self, + *, + coin_id: bytes32, + override_recovery_list_hash: bytes32 | None = None, + override_num_verification: uint16 | None = None, + override_metadata: dict[str, str] | None = None, + ) -> None: + # Get coin state + peer = self.wallet_node.get_full_node_peer() + coin_spend, coin_state = await self.get_latest_singleton_coin_spend(peer, coin_id) + uncurried = uncurry_puzzle(coin_spend.puzzle_reveal) + curried_args = match_did_puzzle(uncurried.mod, uncurried.args) + if curried_args is None: + raise ValueError("The coin is not a DID.") + p2_puzzle, recovery_list_hash, num_verification, singleton_struct, metadata = curried_args + num_verification_int: uint16 | None = uint16(num_verification.as_int()) + assert num_verification_int is not None + did_data: DIDCoinData = DIDCoinData( + p2_puzzle, + bytes32(recovery_list_hash.as_atom()) if recovery_list_hash != Program.NIL else None, + num_verification_int, + singleton_struct, + metadata, + get_inner_puzzle_from_singleton(coin_spend.puzzle_reveal), + coin_state, + ) + hinted_coins, _ = compute_spend_hints_and_additions(coin_spend) + # Hint is required, if it doesn't have any hint then it should be invalid + hint: bytes32 | None = None + for hinted_coin in hinted_coins.values(): + if hinted_coin.coin.amount % 2 == 1 and hinted_coin.hint is not None: + hint = hinted_coin.hint + break + derivation_record = None + if hint is not None: + derivation_record = await self.puzzle_store.get_derivation_record_for_puzzle_hash(hint) + if derivation_record is None: + # This is an invalid DID, check if we are owner + derivation_record = await self.puzzle_store.get_derivation_record_for_puzzle_hash(p2_puzzle.get_tree_hash()) + + launcher_id = bytes32(singleton_struct.rest().first().as_atom()) + if derivation_record is None: + raise ValueError(f"This DID {launcher_id} does not belong to the connected wallet") + else: + our_inner_puzzle: Program = self.main_wallet.puzzle_for_pk(derivation_record.pubkey) + did_puzzle = DID_INNERPUZ_MOD.curry( + our_inner_puzzle, recovery_list_hash, num_verification, singleton_struct, metadata + ) + full_puzzle = create_singleton_puzzle(did_puzzle, launcher_id) + did_puzzle_empty_recovery = DID_INNERPUZ_MOD.curry( + our_inner_puzzle, NIL_TREEHASH, uint64(0), singleton_struct, metadata + ) + # Check if we have the DID wallet + did_wallet: DIDWallet | None = None + for wallet in self.wallets.values(): + if isinstance(wallet, DIDWallet): + assert wallet.did_info.origin_coin is not None + if wallet.did_info.origin_coin.name() == launcher_id: + did_wallet = wallet + break + + full_puzzle_empty_recovery = create_singleton_puzzle(did_puzzle_empty_recovery, launcher_id) + if full_puzzle.get_tree_hash() != coin_state.coin.puzzle_hash: + # It's unclear whether this path is ever reached, and there is no coverage in the DID wallet tests + if full_puzzle_empty_recovery.get_tree_hash() == coin_state.coin.puzzle_hash: + did_puzzle = did_puzzle_empty_recovery + elif ( + did_wallet is not None + and did_wallet.did_info.current_inner is not None + and create_singleton_puzzle(did_wallet.did_info.current_inner, launcher_id).get_tree_hash() + == coin_state.coin.puzzle_hash + ): + # Check if the old wallet has the inner puzzle + did_puzzle = did_wallet.did_info.current_inner + else: + # Try override + if override_recovery_list_hash is not None: + recovery_list_hash = Program.from_bytes(override_recovery_list_hash) + if override_num_verification is not None: + num_verification_int = override_num_verification + if override_metadata is not None: + metadata = metadata_to_program(override_metadata) + did_puzzle = DID_INNERPUZ_MOD.curry( + our_inner_puzzle, recovery_list_hash, num_verification, singleton_struct, metadata + ) + full_puzzle = create_singleton_puzzle(did_puzzle, launcher_id) + matched = True + if full_puzzle.get_tree_hash() != coin_state.coin.puzzle_hash: + matched = False + # Brute force addresses + index = 0 + derivation_record = await self.puzzle_store.get_derivation_record( + uint32(index), uint32(1), False + ) + while derivation_record is not None: + our_inner_puzzle = self.main_wallet.puzzle_for_pk(derivation_record.pubkey) + did_puzzle = DID_INNERPUZ_MOD.curry( + our_inner_puzzle, recovery_list_hash, num_verification, singleton_struct, metadata + ) + full_puzzle = create_singleton_puzzle(did_puzzle, launcher_id) + if full_puzzle.get_tree_hash() == coin_state.coin.puzzle_hash: + matched = True + break + index += 1 + derivation_record = await self.puzzle_store.get_derivation_record( + uint32(index), uint32(1), False + ) + + if not matched: + raise RuntimeError( + f"Cannot recover DID {launcher_id} " + f"because the last spend updated recovery_list_hash/num_verification/metadata." + ) + + if did_wallet is None: + # Create DID wallet + response: list[CoinState] = await self.wallet_node.get_coin_state([launcher_id], peer=peer) + if len(response) == 0: + raise ValueError(f"Could not find the launch coin with ID: {launcher_id}") + launcher_coin: CoinState = response[0] + did_wallet = await DIDWallet.create_new_did_wallet_from_coin_spend( + self, + self.main_wallet, + launcher_coin.coin, + did_puzzle, + coin_spend, + f"DID {encode_puzzle_hash(launcher_id, AddressType.DID.hrp(self.config))}", + ) + else: + assert did_wallet.did_info.current_inner is not None + if did_wallet.did_info.current_inner.get_tree_hash() != did_puzzle.get_tree_hash(): + # Inner DID puzzle doesn't match, we need to update the DID info + full_solution: Program = Program.from_bytes(bytes(coin_spend.solution)) + inner_solution: Program = full_solution.rest().rest().first() + recovery_list: list[bytes32] = [] + backup_required: int = num_verification.as_int() + if not did_recovery_is_nil(recovery_list_hash): + try: + for did in inner_solution.rest().rest().rest().rest().rest().as_python(): + recovery_list.append(did[0]) + except Exception: + # We cannot recover the recovery list, but it's okay to leave it blank + pass + did_info: DIDInfo = DIDInfo( + did_wallet.did_info.origin_coin, + recovery_list, + uint64(backup_required), + [], + did_puzzle, + None, + None, + None, + False, + json.dumps(did_program_to_metadata(metadata)), + ) + await did_wallet.save_info(did_info) + await self.update_wallet_puzzle_hashes(did_wallet.wallet_info.id) + + try: + coin = await did_wallet.get_coin() + if coin.name() == coin_state.coin.name(): + return + except RuntimeError: + # We don't have any coin for this wallet, add the coin + pass + + wallet_id = did_wallet.id() + wallet_type = did_wallet.type() + assert coin_state.created_height is not None + coin_record: WalletCoinRecord = WalletCoinRecord( + coin_state.coin, uint32(coin_state.created_height), uint32(0), False, False, wallet_type, wallet_id + ) + await self.coin_store.add_coin_record(coin_record, coin_state.coin.name()) + await did_wallet.coin_added( + coin_state.coin, + uint32(coin_state.created_height), + peer, + did_data, + ) + + +@dataclasses.dataclass(kw_only=True, frozen=True) +class ManualDIDSearchResults: + launcher_id: bytes32 + latest_coin: bytes32 + p2_puzzle_hash: bytes32 + public_key: G1Element + recovery_list_hash: bytes32 | None + num_verification: uint16 + metadata: dict[str, str] + full_puzzle: Program + solution: Program + hints: list[bytes] + + +@dataclasses.dataclass(kw_only=True, frozen=True) +class ManualNFTSearchResults: + nft_info: NFTInfo + next_p2_puzzle_hash: bytes32