Source code for matic.plasma.erc_721

from __future__ import annotations

from typing import Callable

from eth_typing import HexAddress

from matic.abstracts import BaseContract
from matic.constants import PlasmaLogEventSignature
from matic.json_types import (
    IPlasmaClientConfig,
    IPlasmaContracts,
    ITransactionOption,
    ITransactionWriteResult,
)
from matic.plasma.plasma_token import PlasmaToken
from matic.utils.web3_side_chain_client import Web3SideChainClient


[docs]class ERC721(PlasmaToken): """ERC-721-compliant token on plasma bridge.""" WITHDRAW_EXIT_SIGNATURE = PlasmaLogEventSignature.ERC_721_WITHDRAW_EVENT_SIG """Withdraw event signature, used for exit methods.""" def __init__( self, token_address: HexAddress, is_parent: bool, client: Web3SideChainClient[IPlasmaClientConfig], contracts: Callable[[], IPlasmaContracts], ): super().__init__( address=token_address, is_parent=is_parent, name='ChildERC721', client=client, get_helper_contracts=contracts, ) @property def predicate(self) -> BaseContract: """Get predicate contract for token.""" return self._fetch_predicate( 'erc721Predicate', 'ERC721Predicate', self.client.config.get('erc_721_predicate'), )
[docs] def get_tokens_count( self, user_address: HexAddress, option: ITransactionOption | None = None ) -> int: """Get tokens count for the user.""" method = self.contract.method('balanceOf', user_address) return self.process_read(method, option)
[docs] def get_token_id_at_index_for_user( self, index: int, user_address: HexAddress, option: ITransactionOption | None = None, ) -> int: """Returns token id on supplied index for user.""" method = self.contract.method('tokenOfOwnerByIndex', user_address, index) return self.process_read(method, option)
[docs] def safe_deposit( self, token_id: int, user_address: HexAddress, private_key: str | None = None, option: ITransactionOption | None = None, ) -> ITransactionWriteResult: """Perform safeTransferFrom.""" self.check_for_root() method = self.contract.method( 'safeTransferFrom', user_address, self.get_helper_contracts().deposit_manager.address, token_id, ) return self.process_write(method, option, private_key)
[docs] def withdraw_start( self, token_id: int, private_key: str | None = None, option: ITransactionOption | None = None, ) -> ITransactionWriteResult: """Initialize withdrawal process.""" self.check_for_child() method = self.contract.method('withdraw', token_id) return self.process_write(method, option, private_key)
[docs] def transfer( self, token_id: int, from_: HexAddress, to: HexAddress, private_key: str | None = None, option: ITransactionOption | None = None, ) -> ITransactionWriteResult: """Perform transfer to another address.""" return self.transfer_erc_721(from_, to, token_id, private_key, option)
[docs] def get_all_tokens( self, user_address: HexAddress, limit: int | None = None ) -> list[int]: """Get all token ids that belong to the given user.""" if limit is None: limit = self.get_tokens_count(user_address) # TODO: should be async return [ self.get_token_id_at_index_for_user(i, user_address) for i in range(limit) ]