From a71d039541a16dcd092772c74d8ace6e9b55c3a4 Mon Sep 17 00:00:00 2001 From: antazoey Date: Sat, 20 Apr 2024 08:26:06 -0600 Subject: [PATCH] refactor!: drop 3.8 support (#1983) --- .github/workflows/test.yaml | 2 +- README.md | 2 +- setup.py | 8 ++--- src/ape/_cli.py | 21 +++++------ src/ape/api/providers.py | 2 +- src/ape/api/query.py | 8 ++--- src/ape/exceptions.py | 11 +++--- src/ape/logging.py | 11 +++--- src/ape_accounts/accounts.py | 7 ++-- src/ape_cache/query.py | 15 ++++---- src/ape_compile/__init__.py | 4 +-- src/ape_compile/_cli.py | 5 ++- src/ape_console/_cli.py | 8 ++--- src/ape_console/config.py | 4 +-- src/ape_ethereum/_print.py | 17 ++++----- src/ape_ethereum/ecosystem.py | 60 ++++++++++++++++++-------------- src/ape_ethereum/provider.py | 46 ++++++++++++++---------- src/ape_ethereum/transactions.py | 22 ++++++------ src/ape_networks/__init__.py | 6 ++-- src/ape_networks/_cli.py | 4 +-- src/ape_node/provider.py | 16 ++++----- src/ape_node/query.py | 3 +- src/ape_plugins/_cli.py | 10 +++--- src/ape_pm/_cli.py | 3 +- src/ape_pm/compiler.py | 9 ++--- src/ape_test/__init__.py | 10 +++--- src/ape_test/_cli.py | 4 +-- src/ape_test/accounts.py | 10 +++--- src/ape_test/provider.py | 8 ++--- tests/conftest.py | 9 ++--- 30 files changed, 178 insertions(+), 167 deletions(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 276a79524f..bd765ed90a 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -63,7 +63,7 @@ jobs: strategy: matrix: os: [ubuntu-latest, macos-latest] # eventually add `windows-latest` - python-version: [3.8, 3.9, "3.10", "3.11"] + python-version: [3.9, "3.10", "3.11"] env: GITHUB_ACCESS_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/README.md b/README.md index c7840afd6a..28d1d1dfb0 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ Read our [academic platform](https://academy.apeworx.io/) will help you master A In the latest release, Ape requires: - Linux or macOS -- Python 3.8 up to 3.11 +- Python 3.9 up to 3.11 - **Windows**: Install Windows Subsystem Linux [(WSL)](https://docs.microsoft.com/en-us/windows/wsl/install) Check your python version in a terminal with `python3 --version`. diff --git a/setup.py b/setup.py index 3f4cf246d0..ebe87b6584 100644 --- a/setup.py +++ b/setup.py @@ -98,13 +98,12 @@ install_requires=[ "click>=8.1.6,<9", "ijson>=3.1.4,<4", - "importlib-metadata", # NOTE: Needed on 3.8 for entry_points `group=` kwarg. "ipython>=8.5.0,<9", "lazyasd>=0.1.4", "packaging>=23.0,<24", "pandas>=1.3.0,<2", "pluggy>=1.3,<2", - "pydantic>=2.5.2,<3", + "pydantic>=2.6.4,<3", "pydantic-settings>=2.0.3,<3", "pytest>=6.0,<8.0", "python-dateutil>=2.8.2,<3", @@ -128,7 +127,7 @@ "eip712>=0.2.7,<0.3", "ethpm-types>=0.6.9,<0.7", "eth_pydantic_types>=0.1.0,<0.2", - "evmchains>=0.0.6,<0.1", + "evmchains>=0.0.7,<0.1", "evm-trace>=0.1.5,<0.2", ], entry_points={ @@ -147,7 +146,7 @@ "ape_pm=ape_pm._cli:cli", ], }, - python_requires=">=3.8,<4", + python_requires=">=3.9,<4", extras_require=extras_require, py_modules=packages_data["__modules__"], license="Apache-2.0", @@ -164,7 +163,6 @@ "Operating System :: MacOS", "Operating System :: POSIX", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", diff --git a/src/ape/_cli.py b/src/ape/_cli.py index a4d85abfb9..4853d09620 100644 --- a/src/ape/_cli.py +++ b/src/ape/_cli.py @@ -2,10 +2,10 @@ import re import sys from gettext import gettext -from typing import Any, Dict, List, Optional, Tuple +from importlib.metadata import entry_points +from typing import Any, Iterable, Optional import click -import importlib_metadata as metadata import yaml from ape.cli import ape_cli_context @@ -30,7 +30,7 @@ def display_config(ctx, param, value): class ApeCLI(click.MultiCommand): - _commands: Optional[Dict] = None + _commands: Optional[dict] = None _CLI_GROUP_NAME = "ape_cli_subcommands" def format_commands(self, ctx, formatter) -> None: @@ -47,7 +47,7 @@ def format_commands(self, ctx, formatter) -> None: limit = formatter.width - 6 - max(len(cmd[0]) for cmd in commands) # Split the commands into 3 sections. - sections: Dict[str, List[Tuple[str, str]]] = { + sections: dict[str, list[tuple[str, str]]] = { "Core": [], "Plugin": [], "3rd-Party Plugin": [], @@ -114,20 +114,15 @@ def _suggest_cmd(usage_error): raise usage_error @property - def commands(self) -> Dict: + def commands(self) -> dict: if self._commands: return self._commands - entry_points = metadata.entry_points(group=self._CLI_GROUP_NAME) - if not entry_points: - raise Abort("Missing registered CLI subcommands.") - - self._commands = { - clean_plugin_name(entry_point.name): entry_point.load for entry_point in entry_points - } + eps: Iterable = entry_points().get(self._CLI_GROUP_NAME, []) + self._commands = {clean_plugin_name(cmd.name): cmd.load for cmd in eps} return self._commands - def list_commands(self, ctx) -> List[str]: + def list_commands(self, ctx) -> list[str]: return list(sorted(self.commands)) def get_command(self, ctx, name) -> Optional[click.Command]: diff --git a/src/ape/api/providers.py b/src/ape/api/providers.py index 8f0fff8f0f..07cdfbdaa1 100644 --- a/src/ape/api/providers.py +++ b/src/ape/api/providers.py @@ -310,7 +310,7 @@ def network_choice(self) -> str: return f"{self.network.choice}:{self.name}" @abstractmethod - def make_request(self, rpc: str, parameters: Optional[List] = None) -> Any: + def make_request(self, rpc: str, parameters: Optional[Iterable] = None) -> Any: """ Make a raw RPC request to the provider. Advanced featues such as tracing may utilize this to by-pass unnecessary diff --git a/src/ape/api/query.py b/src/ape/api/query.py index 4f9c3eb937..968c22a7eb 100644 --- a/src/ape/api/query.py +++ b/src/ape/api/query.py @@ -1,4 +1,4 @@ -from functools import lru_cache +from functools import cache from typing import Any, Dict, Iterator, List, Optional, Sequence, Set, Type, Union from ethpm_types.abi import BaseModel, EventABI, MethodABI @@ -19,8 +19,7 @@ ] -# TODO: Replace with `functools.cache` when Py3.8 dropped -@lru_cache(maxsize=None) +@cache def _basic_columns(Model: Type[BaseInterfaceModel]) -> Set[str]: columns = set(Model.model_fields) @@ -32,8 +31,7 @@ def _basic_columns(Model: Type[BaseInterfaceModel]) -> Set[str]: return columns -# TODO: Replace with `functools.cache` when Py3.8 dropped -@lru_cache(maxsize=None) +@cache def _all_columns(Model: Type[BaseInterfaceModel]) -> Set[str]: columns = _basic_columns(Model) # NOTE: Iterate down the series of subclasses of `Model` (e.g. Block and BlockAPI) diff --git a/src/ape/exceptions.py b/src/ape/exceptions.py index b5259df666..0e7d3668dc 100644 --- a/src/ape/exceptions.py +++ b/src/ape/exceptions.py @@ -3,10 +3,11 @@ import tempfile import time import traceback +from collections.abc import Collection from inspect import getframeinfo, stack from pathlib import Path from types import CodeType, TracebackType -from typing import TYPE_CHECKING, Any, Collection, Dict, List, Optional, Union, cast +from typing import TYPE_CHECKING, Any, Optional, Union, cast import click from eth_typing import Hash32 @@ -103,7 +104,7 @@ class ArgumentsLengthError(ContractDataError): def __init__( self, arguments_length: int, - inputs: Union[MethodABI, ConstructorABI, int, List, None] = None, + inputs: Union[MethodABI, ConstructorABI, int, list, None] = None, **kwargs, ): prefix = ( @@ -114,7 +115,7 @@ def __init__( super().__init__(f"{prefix}.") return - inputs_ls: List[Union[MethodABI, ConstructorABI, int]] = ( + inputs_ls: list[Union[MethodABI, ConstructorABI, int]] = ( inputs if isinstance(inputs, list) else [inputs] ) if not inputs_ls: @@ -642,7 +643,7 @@ def __init__( super().__init__(provider, *args, **kwargs) -def handle_ape_exception(err: ApeException, base_paths: List[Path]) -> bool: +def handle_ape_exception(err: ApeException, base_paths: list[Path]) -> bool: """ Handle a transaction error by showing relevant stack frames, including custom contract frames added to the exception. @@ -719,7 +720,7 @@ class CustomError(ContractLogicError): def __init__( self, abi: ErrorABI, - inputs: Dict[str, Any], + inputs: dict[str, Any], txn: Optional[FailedTxn] = None, trace: Optional["TraceAPI"] = None, contract_address: Optional["AddressType"] = None, diff --git a/src/ape/logging.py b/src/ape/logging.py index 0d7a8d2fcd..3a5e05fb51 100644 --- a/src/ape/logging.py +++ b/src/ape/logging.py @@ -2,9 +2,10 @@ import logging import sys import traceback +from collections.abc import Sequence from enum import IntEnum from pathlib import Path -from typing import IO, Any, Callable, Dict, Optional, Sequence, Union +from typing import IO, Any, Callable, Optional, Union import click from yarl import URL @@ -73,8 +74,8 @@ def format(self, record): if _isatty(sys.stdout) and _isatty(sys.stderr): # Only color log messages when sys.stdout and sys.stderr are sent to the terminal. level = LogLevel(record.levelno) - default_dict: Dict[str, Any] = {} - styles: Dict[str, Any] = CLICK_STYLE_KWARGS.get(level, default_dict) + default_dict: dict[str, Any] = {} + styles: dict[str, Any] = CLICK_STYLE_KWARGS.get(level, default_dict) record.levelname = click.style(record.levelname, **styles) path = Path(record.pathname) @@ -89,7 +90,7 @@ def format(self, record): class ClickHandler(logging.Handler): def __init__( - self, echo_kwargs: Dict, handlers: Optional[Sequence[Callable[[str], str]]] = None + self, echo_kwargs: dict, handlers: Optional[Sequence[Callable[[str], str]]] = None ): super().__init__() self.echo_kwargs = echo_kwargs @@ -112,7 +113,7 @@ def emit(self, record): class ApeLogger: _mentioned_verbosity_option = False - _extra_loggers: Dict[str, logging.Logger] = {} + _extra_loggers: dict[str, logging.Logger] = {} def __init__( self, diff --git a/src/ape_accounts/accounts.py b/src/ape_accounts/accounts.py index 0e58756d5b..2757a0bc5d 100644 --- a/src/ape_accounts/accounts.py +++ b/src/ape_accounts/accounts.py @@ -1,7 +1,8 @@ import json +from collections.abc import Iterator from os import environ from pathlib import Path -from typing import Any, Dict, Iterator, Optional, Tuple +from typing import Any, Optional import click from eip712.messages import EIP712Message @@ -32,7 +33,7 @@ def __init__(self): class AccountContainer(AccountContainerAPI): - loaded_accounts: Dict[str, "KeyfileAccount"] = {} + loaded_accounts: dict[str, "KeyfileAccount"] = {} @property def _keyfiles(self) -> Iterator[Path]: @@ -292,7 +293,7 @@ def _write_and_return_account(alias: str, passphrase: str, account: LocalAccount def generate_account( alias: str, passphrase: str, hd_path: str = ETHEREUM_DEFAULT_PATH, word_count: int = 12 -) -> Tuple[KeyfileAccount, str]: +) -> tuple[KeyfileAccount, str]: """ Generate a new account. diff --git a/src/ape_cache/query.py b/src/ape_cache/query.py index 327fefdd87..a6eec04d1d 100644 --- a/src/ape_cache/query.py +++ b/src/ape_cache/query.py @@ -1,5 +1,6 @@ +from collections.abc import Iterator from pathlib import Path -from typing import Any, Dict, Iterator, List, Optional, cast +from typing import Any, Optional, cast from sqlalchemy import create_engine, func from sqlalchemy.engine import CursorResult @@ -326,7 +327,7 @@ def _perform_block_query(self, query: BlockQuery) -> Iterator[BlockAPI]: ) @perform_query.register - def _perform_transaction_query(self, query: BlockTransactionQuery) -> Iterator[Dict]: + def _perform_transaction_query(self, query: BlockTransactionQuery) -> Iterator[dict]: with self.database_connection as conn: result = conn.execute( select([Transactions]).where(Transactions.block_hash == query.block_id) @@ -395,7 +396,7 @@ def _cache_update_events_clause(self, query: ContractEventQuery) -> Insert: @singledispatchmethod def _get_cache_data( self, query: QueryType, result: Iterator[BaseInterfaceModel] - ) -> Optional[List[Dict[str, Any]]]: + ) -> Optional[list[dict[str, Any]]]: raise QueryEngineError( """ Not a compatible QueryType. For more details see our docs @@ -406,16 +407,16 @@ def _get_cache_data( @_get_cache_data.register def _get_block_cache_data( self, query: BlockQuery, result: Iterator[BaseInterfaceModel] - ) -> Optional[List[Dict[str, Any]]]: + ) -> Optional[list[dict[str, Any]]]: return [m.model_dump(mode="json", by_alias=False) for m in result] @_get_cache_data.register def _get_block_txns_data( self, query: BlockTransactionQuery, result: Iterator[BaseInterfaceModel] - ) -> Optional[List[Dict[str, Any]]]: + ) -> Optional[list[dict[str, Any]]]: new_result = [] table_columns = [c.key for c in Transactions.__table__.columns] # type: ignore - txns: List[TransactionAPI] = cast(List[TransactionAPI], result) + txns: list[TransactionAPI] = cast(list[TransactionAPI], result) for val in [m for m in txns]: new_dict = { k: v @@ -443,7 +444,7 @@ def _get_block_txns_data( @_get_cache_data.register def _get_cache_events_data( self, query: ContractEventQuery, result: Iterator[BaseInterfaceModel] - ) -> Optional[List[Dict[str, Any]]]: + ) -> Optional[list[dict[str, Any]]]: return [m.model_dump(mode="json", by_alias=False) for m in result] def update_cache(self, query: QueryType, result: Iterator[BaseInterfaceModel]): diff --git a/src/ape_compile/__init__.py b/src/ape_compile/__init__.py index ed9c64653f..ba7fa5b26e 100644 --- a/src/ape_compile/__init__.py +++ b/src/ape_compile/__init__.py @@ -1,5 +1,5 @@ from pathlib import Path -from typing import List, Optional +from typing import Optional from pydantic import field_validator, model_validator @@ -21,7 +21,7 @@ class Config(PluginConfig): should configure ``include_dependencies`` to be ``True``. """ - exclude: List[str] = ["*package.json", "*package-lock.json", "*tsconfig.json"] + exclude: list[str] = ["*package.json", "*package-lock.json", "*tsconfig.json"] """ Source exclusion globs across all file types. """ diff --git a/src/ape_compile/_cli.py b/src/ape_compile/_cli.py index e190acdb56..c57e44a8d8 100644 --- a/src/ape_compile/_cli.py +++ b/src/ape_compile/_cli.py @@ -1,5 +1,4 @@ from pathlib import Path -from typing import Dict, Set import click from ethpm_types import ContractType @@ -38,7 +37,7 @@ def _include_dependencies_callback(ctx, param, value): callback=_include_dependencies_callback, ) @ape_cli_context() -def cli(cli_ctx, file_paths: Set[Path], use_cache: bool, display_size: bool, include_dependencies): +def cli(cli_ctx, file_paths: set[Path], use_cache: bool, display_size: bool, include_dependencies): """ Compiles the manifest for this project and saves the results back to the manifest. @@ -97,7 +96,7 @@ def cli(cli_ctx, file_paths: Set[Path], use_cache: bool, display_size: bool, inc _display_byte_code_sizes(cli_ctx, contract_types) -def _display_byte_code_sizes(cli_ctx, contract_types: Dict[str, ContractType]): +def _display_byte_code_sizes(cli_ctx, contract_types: dict[str, ContractType]): # Display bytecode size for *all* contract types (not just ones we compiled) code_size = [] for contract in contract_types.values(): diff --git a/src/ape_console/_cli.py b/src/ape_console/_cli.py index ffba704d2d..8d69b01231 100644 --- a/src/ape_console/_cli.py +++ b/src/ape_console/_cli.py @@ -6,7 +6,7 @@ from importlib.util import module_from_spec, spec_from_loader from os import environ, getcwd from types import ModuleType -from typing import Any, Dict, cast +from typing import Any, cast import click import IPython @@ -46,7 +46,7 @@ def import_extras_file(file_path) -> ModuleType: return module -def load_console_extras(**namespace: Any) -> Dict[str, Any]: +def load_console_extras(**namespace: Any) -> dict[str, Any]: """load and return namespace updates from ape_console_extras.py files if they exist""" global_extras = ManagerAccessMixin.config_manager.DATA_FOLDER.joinpath(CONSOLE_EXTRAS_FILENAME) @@ -66,7 +66,7 @@ def load_console_extras(**namespace: Any) -> Dict[str, Any]: # Figure out the kwargs the func is looking for and assemble # from the original namespace func_spec = inspect.getfullargspec(ape_init_extras) - init_kwargs: Dict[str, Any] = {k: namespace.get(k) for k in func_spec.args} + init_kwargs: dict[str, Any] = {k: namespace.get(k) for k in func_spec.args} # Execute functionality with existing console namespace as # kwargs. @@ -140,7 +140,7 @@ def console(project=None, verbose=None, extra_locals=None, embed=False): _launch_console(namespace, ipy_config, embed, banner) -def _launch_console(namespace: Dict, ipy_config: IPythonConfig, embed: bool, banner: str): +def _launch_console(namespace: dict, ipy_config: IPythonConfig, embed: bool, banner: str): ipython_kwargs = {"user_ns": namespace, "config": ipy_config} if embed: IPython.embed(**ipython_kwargs, colors="Neutral", banner1=banner) diff --git a/src/ape_console/config.py b/src/ape_console/config.py index e1af9a2764..1290241dc2 100644 --- a/src/ape_console/config.py +++ b/src/ape_console/config.py @@ -1,8 +1,6 @@ -from typing import List - from ape.api import PluginConfig class ConsoleConfig(PluginConfig): - plugins: List[str] = [] + plugins: list[str] = [] """Additional IPython plugins to include in your session.""" diff --git a/src/ape_ethereum/_print.py b/src/ape_ethereum/_print.py index b5e421aa05..edcb4c3918 100644 --- a/src/ape_ethereum/_print.py +++ b/src/ape_ethereum/_print.py @@ -19,11 +19,12 @@ - Discussion on dynamic ABI encoding (Vyper-style) for log calls: https://github.com/NomicFoundation/hardhat/issues/2666 # noqa: E501 """ -from typing import Any, Iterable, Tuple, cast +from collections.abc import Iterable +from typing import Any, cast from eth_abi import decode -from eth_typing import ChecksumAddress -from eth_utils import decode_hex +from eth_typing import ChecksumAddress, HexStr +from eth_utils import add_0x_prefix, decode_hex from ethpm_types import ContractType, MethodABI from evm_trace import CallTreeNode from hexbytes import HexBytes @@ -65,24 +66,24 @@ def is_vyper_print(call: CallTreeNode) -> TypeGuard[CallTreeNode]: ) -def console_log(method_abi: MethodABI, calldata: str) -> Tuple[Any]: +def console_log(method_abi: MethodABI, calldata: str) -> tuple[Any]: """Return logged data for console.log() calls""" bcalldata = decode_hex(calldata) data = ape.networks.ethereum.decode_calldata(method_abi, bcalldata) return tuple(data.values()) -def vyper_print(calldata: HexBytes) -> Tuple[Any]: +def vyper_print(calldata: str) -> tuple[Any]: """Return logged data for print() calls""" - schema, payload = decode(["string", "bytes"], calldata) + schema, payload = decode(["string", "bytes"], HexBytes(calldata)) data = decode(schema.strip("()").split(","), payload) return tuple(data) -def extract_debug_logs(call: CallTreeNode) -> Iterable[Tuple[Any]]: +def extract_debug_logs(call: CallTreeNode) -> Iterable[tuple[Any]]: """Filter calls to console.log() and print() from a transactions call tree""" if is_vyper_print(call) and call.calldata is not None: - yield vyper_print(call.calldata[4:]) + yield vyper_print(add_0x_prefix(HexStr(call.calldata[4:].hex()))) elif is_console_log(call) and call.calldata is not None: method_abi = console_contract.identifier_lookup.get(call.calldata[:4].hex()) diff --git a/src/ape_ethereum/ecosystem.py b/src/ape_ethereum/ecosystem.py index dbeb5e842d..3270897bfa 100644 --- a/src/ape_ethereum/ecosystem.py +++ b/src/ape_ethereum/ecosystem.py @@ -1,7 +1,8 @@ import re +from collections.abc import Iterator, Sequence from decimal import Decimal from functools import cached_property -from typing import Any, ClassVar, Dict, Iterator, List, Optional, Sequence, Tuple, Type, Union, cast +from typing import Any, ClassVar, Optional, Union, cast from eth_abi import decode, encode from eth_abi.exceptions import InsufficientDataBytes, NonEmptyPaddingBytes @@ -155,7 +156,7 @@ def create_local_network_config( def create_network_config( required_confirmations: int = 2, base_fee_multiplier: float = DEFAULT_LIVE_NETWORK_BASE_FEE_MULTIPLIER, - cls: Type = NetworkConfig, + cls: type = NetworkConfig, **kwargs, ) -> NetworkConfig: return cls( @@ -172,18 +173,18 @@ class BaseEthereumConfig(PluginConfig): DEFAULT_TRANSACTION_TYPE: ClassVar[int] = TransactionType.DYNAMIC.value DEFAULT_LOCAL_GAS_LIMIT: ClassVar[GasLimit] = "max" - NETWORKS: ClassVar[Dict[str, Tuple[int, int]]] = NETWORKS + NETWORKS: ClassVar[dict[str, tuple[int, int]]] = NETWORKS default_network: str = LOCAL_NETWORK_NAME - _forked_configs: Dict[str, ForkedNetworkConfig] = {} - _custom_networks: Dict[str, NetworkConfig] = {} + _forked_configs: dict[str, ForkedNetworkConfig] = {} + _custom_networks: dict[str, NetworkConfig] = {} model_config = SettingsConfigDict(extra="allow") @model_validator(mode="before") @classmethod def load_network_configs(cls, values): - cfg_forks: Dict[str, ForkedNetworkConfig] = {} + cfg_forks: dict[str, ForkedNetworkConfig] = {} custom_networks = {} for name, obj in values.items(): if name.startswith("_"): @@ -292,7 +293,7 @@ class Block(BlockAPI): base_fee: int = Field(0, alias="baseFeePerGas") difficulty: int = 0 total_difficulty: int = Field(0, alias="totalDifficulty") - uncles: List[HexBytes] = [] + uncles: list[HexBytes] = [] # Type re-declares. hash: Optional[HexBytes] = None @@ -323,9 +324,14 @@ def size(self) -> int: # (normal). return self._size + number = self.number + if number is None: + # Not sure. + raise APINotImplementedError() + # Try to get it from the provider. if provider := self.network_manager.active_provider: - block = provider.get_block(self.number) + block = provider.get_block(number) size = block._size if size is not None and size > -1: self._size = size @@ -369,7 +375,7 @@ def decode_address(cls, raw_address: RawAddress) -> AddressType: def encode_address(cls, address: AddressType) -> RawAddress: return str(address) - def decode_transaction_type(self, transaction_type_id: Any) -> Type[TransactionAPI]: + def decode_transaction_type(self, transaction_type_id: Any) -> type[TransactionAPI]: if isinstance(transaction_type_id, TransactionType): tx_type = transaction_type_id elif isinstance(transaction_type_id, int): @@ -497,7 +503,7 @@ def str_to_slot(text): return None - def decode_receipt(self, data: Dict) -> ReceiptAPI: + def decode_receipt(self, data: dict) -> ReceiptAPI: status = data.get("status") if status is not None: status = self.conversion_manager.convert(status, int) @@ -536,7 +542,7 @@ def decode_receipt(self, data: Dict) -> ReceiptAPI: transaction=self.create_transaction(**data), ) - receipt_cls: Type[Receipt] + receipt_cls: type[Receipt] if any( x in data for x in ("blobGasPrice", "blobGasUsed", "blobVersionedHashes", "maxFeePerBlobGas") @@ -549,7 +555,7 @@ def decode_receipt(self, data: Dict) -> ReceiptAPI: return receipt_cls.model_validate(receipt_kwargs) - def decode_block(self, data: Dict) -> BlockAPI: + def decode_block(self, data: dict) -> BlockAPI: data["hash"] = HexBytes(data["hash"]) if data.get("hash") else None if "gas_limit" in data: data["gasLimit"] = data.pop("gas_limit") @@ -570,7 +576,7 @@ def decode_block(self, data: Dict) -> BlockAPI: return Block.model_validate(data) - def _python_type_for_abi_type(self, abi_type: ABIType) -> Union[Type, Sequence]: + def _python_type_for_abi_type(self, abi_type: ABIType) -> Union[type, Sequence]: # NOTE: An array can be an array of tuples, so we start with an array check if str(abi_type.type).endswith("]"): # remove one layer of the potential onion of array @@ -616,7 +622,7 @@ def encode_calldata(self, abi: Union[ConstructorABI, MethodABI], *args) -> HexBy encoded_calldata = encode(input_types, converted_args) return HexBytes(encoded_calldata) - def decode_calldata(self, abi: Union[ConstructorABI, MethodABI], calldata: bytes) -> Dict: + def decode_calldata(self, abi: Union[ConstructorABI, MethodABI], calldata: bytes) -> dict: raw_input_types = [i.canonical_type for i in abi.inputs] input_types = [parse_type(i.model_dump(mode="json")) for i in abi.inputs] @@ -637,7 +643,7 @@ def decode_calldata(self, abi: Union[ConstructorABI, MethodABI], calldata: bytes return arguments - def decode_returndata(self, abi: MethodABI, raw_data: bytes) -> Tuple[Any, ...]: + def decode_returndata(self, abi: MethodABI, raw_data: bytes) -> tuple[Any, ...]: output_types_str_ls = [o.canonical_type for o in abi.outputs] if raw_data: @@ -721,8 +727,8 @@ def _enrich_value(self, value: Any, **kwargs) -> Any: return value def decode_primitive_value( - self, value: Any, output_type: Union[str, Tuple, List] - ) -> Union[str, HexBytes, Tuple, List]: + self, value: Any, output_type: Union[str, tuple, list] + ) -> Union[str, HexBytes, tuple, list]: if output_type == "address": try: return self.decode_address(value) @@ -819,7 +825,7 @@ def create_transaction(self, **kwargs) -> TransactionAPI: tx_data["data"] = b"" # Deduce the transaction type. - transaction_types: Dict[TransactionType, Type[TransactionAPI]] = { + transaction_types: dict[TransactionType, type[TransactionAPI]] = { TransactionType.STATIC: StaticFeeTransaction, TransactionType.ACCESS_LIST: AccessListTransaction, TransactionType.DYNAMIC: DynamicFeeTransaction, @@ -890,7 +896,7 @@ def create_transaction(self, **kwargs) -> TransactionAPI: return txn_class(**tx_data) - def decode_logs(self, logs: Sequence[Dict], *events: EventABI) -> Iterator["ContractLog"]: + def decode_logs(self, logs: Sequence[dict], *events: EventABI) -> Iterator["ContractLog"]: if not logs: return @@ -921,7 +927,7 @@ def get_abi(_topic: HexStr) -> Optional[LogInputABICollection]: # Since LogABICollection does not have access to the Ecosystem, # the rest of the decoding must happen here. - converted_arguments: Dict = {} + converted_arguments: dict = {} for item in abi.abi.inputs: _type, key, value = item.canonical_type, item.name, event_arguments[item.name] @@ -991,7 +997,7 @@ def enrich_trace(self, trace: TraceAPI, **kwargs) -> TraceAPI: return trace - def _enrich_calltree(self, call: Dict, **kwargs) -> Dict: + def _enrich_calltree(self, call: dict, **kwargs) -> dict: if "contract_id" in call: # Already enriched. return call @@ -1122,11 +1128,11 @@ def _enrich_contract_id(self, address: AddressType, **kwargs) -> str: def _enrich_calldata( self, - call: Dict, + call: dict, method_abi: Union[MethodABI, ConstructorABI], contract_type: ContractType, **kwargs, - ) -> Dict: + ) -> dict: calldata = call["calldata"] if isinstance(calldata, (str, bytes, int)): calldata_arg = HexBytes(calldata) @@ -1156,7 +1162,7 @@ def _enrich_calldata( return call - def _enrich_returndata(self, call: Dict, method_abi: MethodABI, **kwargs) -> Dict: + def _enrich_returndata(self, call: dict, method_abi: MethodABI, **kwargs) -> dict: if "CREATE" in call.get("call_type", ""): call["returndata"] = "" return call @@ -1208,11 +1214,11 @@ def _enrich_returndata(self, call: Dict, method_abi: MethodABI, **kwargs) -> Dic call["returndata"] = output_val return call - def get_python_types(self, abi_type: ABIType) -> Union[Type, Sequence]: + def get_python_types(self, abi_type: ABIType) -> Union[type, Sequence]: return self._python_type_for_abi_type(abi_type) -def parse_type(type_: Dict[str, Any]) -> Union[str, Tuple, List]: +def parse_type(type_: dict[str, Any]) -> Union[str, tuple, list]: if "tuple" not in type_["type"]: return type_["type"] @@ -1220,7 +1226,7 @@ def parse_type(type_: Dict[str, Any]) -> Union[str, Tuple, List]: return [result] if is_array(type_["type"]) else result -def _correct_key(key: str, data: Dict, alt_keys: Tuple[str, ...]) -> Dict: +def _correct_key(key: str, data: dict, alt_keys: tuple[str, ...]) -> dict: if key in data: return data diff --git a/src/ape_ethereum/provider.py b/src/ape_ethereum/provider.py index 5186a80511..48241ee9f4 100644 --- a/src/ape_ethereum/provider.py +++ b/src/ape_ethereum/provider.py @@ -3,10 +3,12 @@ import sys import time from abc import ABC +from collections.abc import Iterator from concurrent.futures import ThreadPoolExecutor +from copy import copy from functools import cached_property, wraps from pathlib import Path -from typing import Any, Dict, Iterator, List, Optional, Union, cast +from typing import Any, Iterable, Optional, Union, cast import ijson # type: ignore import requests @@ -273,7 +275,7 @@ def estimate_gas_cost(self, txn: TransactionAPI, block_id: Optional[BlockID] = N except (ValueError, Web3ContractLogicError) as err: # NOTE: Try to use debug_traceCall to obtain a trace. # And the RPC can be very picky with inputs. - tx_to_trace: Dict = {} + tx_to_trace: dict = {} for key, val in txn_params.items(): if isinstance(val, int): tx_to_trace[key] = hex(val) @@ -379,7 +381,7 @@ def send_call( self, txn: TransactionAPI, block_id: Optional[BlockID] = None, - state: Optional[Dict] = None, + state: Optional[dict] = None, **kwargs: Any, ) -> HexBytes: if block_id is not None: @@ -446,7 +448,17 @@ def send_call( return HexBytes(trace.return_value) - def _eth_call(self, arguments: List) -> HexBytes: + def _eth_call(self, arguments: list) -> HexBytes: + # Force the usage of hex-type to support a wider-range of nodes. + txn_dict = copy(arguments[0]) + if isinstance(txn_dict.get("type"), int): + txn_dict["type"] = HexBytes(txn_dict["type"]).hex() + + # Remove unnecessary values to support a wider-range of nodes. + txn_dict.pop("chainId", None) + + arguments[0] = txn_dict + try: result = self.make_request("eth_call", arguments) except Exception as err: @@ -468,10 +480,8 @@ def _eth_call(self, arguments: List) -> HexBytes: return HexBytes(result) - def _prepare_call(self, txn: TransactionAPI, **kwargs) -> List: - txn_dict = ( - txn.model_dump(by_alias=True, mode="json") if isinstance(txn, TransactionAPI) else txn - ) + def _prepare_call(self, txn: TransactionAPI, **kwargs) -> list: + txn_dict = txn.model_dump(by_alias=True, mode="json") fields_to_convert = ("data", "chainId", "value") for field in fields_to_convert: value = txn_dict.get(field) @@ -522,7 +532,7 @@ def get_receipt( ) from err ecosystem_config = self.network.ecosystem_config.model_dump(by_alias=True, mode="json") - network_config: Dict = ecosystem_config.get(self.network.name, {}) + network_config: dict = ecosystem_config.get(self.network.name, {}) max_retries = network_config.get("max_get_transaction_retries", DEFAULT_MAX_RETRIES_TX) txn = {} for attempt in range(max_retries): @@ -552,7 +562,7 @@ def get_transactions_by_block(self, block_id: BlockID) -> Iterator[TransactionAP if block_id.isnumeric(): block_id = add_0x_prefix(block_id) - block = cast(Dict, self.web3.eth.get_block(block_id, full_transactions=True)) + block = cast(dict, self.web3.eth.get_block(block_id, full_transactions=True)) for transaction in block.get("transactions", []): yield self.network.ecosystem.create_transaction(**transaction) @@ -732,10 +742,10 @@ def poll_logs( self, stop_block: Optional[int] = None, address: Optional[AddressType] = None, - topics: Optional[List[Union[str, List[str]]]] = None, + topics: Optional[list[Union[str, list[str]]]] = None, required_confirmations: Optional[int] = None, new_block_timeout: Optional[int] = None, - events: Optional[List[EventABI]] = None, + events: Optional[list[EventABI]] = None, ) -> Iterator[ContractLog]: events = events or [] if required_confirmations is None: @@ -749,7 +759,7 @@ def poll_logs( if block.number is None: raise ValueError("Block number cannot be None") - log_params: Dict[str, Any] = { + log_params: dict[str, Any] = { "start_block": block.number, "stop_block": block.number, "events": events, @@ -956,7 +966,7 @@ def _post_connect(self): # Register the console contract for trace enrichment self.chain_manager.contracts._cache_contract_type(CONSOLE_ADDRESS, console_contract) - def make_request(self, rpc: str, parameters: Optional[List] = None) -> Any: + def make_request(self, rpc: str, parameters: Optional[Iterable] = None) -> Any: parameters = parameters or [] coroutine = self.web3.provider.make_request(RPCEndpoint(rpc), parameters) result = run_until_complete(coroutine) @@ -986,7 +996,7 @@ def make_request(self, rpc: str, parameters: Optional[List] = None) -> Any: def create_access_list( self, transaction: TransactionAPI, block_id: Optional[BlockID] = None - ) -> List[AccessList]: + ) -> list[AccessList]: """ Get the access list for a transaction use ``eth_createAccessList``. @@ -1068,7 +1078,7 @@ def _handle_execution_reverted( message = str(exception).split(":")[-1].strip() data = None - params: Dict = { + params: dict = { "trace": trace, "contract_address": contract_address, "source_traceback": source_traceback, @@ -1260,7 +1270,7 @@ def _log_connection(self, client_name: str): ) logger.info(f"{msg} {suffix}.") - def ots_get_contract_creator(self, address: AddressType) -> Optional[Dict]: + def ots_get_contract_creator(self, address: AddressType) -> Optional[dict]: if self._ots_api_level is None: return None @@ -1278,7 +1288,7 @@ def _get_contract_creation_receipt(self, address: AddressType) -> Optional[Recei return None - def stream_request(self, method: str, params: List, iter_path: str = "result.item"): + def stream_request(self, method: str, params: Iterable, iter_path: str = "result.item"): payload = {"jsonrpc": "2.0", "id": 1, "method": method, "params": params} results = ijson.sendable_list() coroutine = ijson.items_coro(results, iter_path) diff --git a/src/ape_ethereum/transactions.py b/src/ape_ethereum/transactions.py index b0c7545ac1..7dbb2dd2a4 100644 --- a/src/ape_ethereum/transactions.py +++ b/src/ape_ethereum/transactions.py @@ -1,7 +1,7 @@ import sys from enum import Enum, IntEnum from functools import cached_property -from typing import IO, Any, Dict, List, Optional, Tuple, Union +from typing import IO, Any, Optional, Union from eth_abi import decode from eth_account import Account as EthAccount @@ -53,7 +53,7 @@ class TransactionType(Enum): class AccessList(BaseModel): address: AddressType - storage_keys: List[HexBytes] = Field(default_factory=list, alias="storageKeys") + storage_keys: list[HexBytes] = Field(default_factory=list, alias="storageKeys") class BaseTransaction(TransactionAPI): @@ -122,7 +122,7 @@ class StaticFeeTransaction(BaseTransaction): @model_validator(mode="before") @classmethod - def calculate_read_only_max_fee(cls, values) -> Dict: + def calculate_read_only_max_fee(cls, values) -> dict: # NOTE: Work-around, Pydantic doesn't handle calculated fields well. values["max_fee"] = cls.conversion_manager.convert( values.get("gas_limit", 0), int @@ -138,7 +138,7 @@ class AccessListTransaction(StaticFeeTransaction): gas_price: Optional[int] = Field(None, alias="gasPrice") type: int = Field(TransactionType.ACCESS_LIST.value) - access_list: List[AccessList] = Field(default_factory=list, alias="accessList") + access_list: list[AccessList] = Field(default_factory=list, alias="accessList") @field_validator("type") @classmethod @@ -155,7 +155,7 @@ class DynamicFeeTransaction(BaseTransaction): max_priority_fee: Optional[int] = Field(None, alias="maxPriorityFeePerGas") # type: ignore max_fee: Optional[int] = Field(None, alias="maxFeePerGas") # type: ignore type: int = Field(TransactionType.DYNAMIC.value) - access_list: List[AccessList] = Field(default_factory=list, alias="accessList") + access_list: list[AccessList] = Field(default_factory=list, alias="accessList") @field_validator("type") @classmethod @@ -169,7 +169,7 @@ class SharedBlobTransaction(DynamicFeeTransaction): """ max_fee_per_blob_gas: int = Field(0, alias="maxFeePerBlobGas") - blob_versioned_hashes: List[HexBytes] = Field([], alias="blobVersionedHashes") + blob_versioned_hashes: list[HexBytes] = Field([], alias="blobVersionedHashes") """ Overridden because EIP-4844 states it cannot be nil. @@ -205,7 +205,7 @@ def failed(self) -> bool: return self.status != TransactionStatusEnum.NO_ERROR @cached_property - def debug_logs_typed(self) -> List[Tuple[Any]]: + def debug_logs_typed(self) -> list[tuple[Any]]: """ Extract messages to console outputted by contracts via print() or console.log() statements """ @@ -275,7 +275,7 @@ def show_source_traceback(self, file: IO[str] = sys.stdout): def decode_logs( self, abi: Optional[ - Union[List[Union[EventABI, "ContractEvent"]], Union[EventABI, "ContractEvent"]] + Union[list[Union[EventABI, "ContractEvent"]], Union[EventABI, "ContractEvent"]] ] = None, ) -> ContractLogContainer: if not self.logs: @@ -286,7 +286,7 @@ def decode_logs( if not isinstance(abi, (list, tuple)): abi = [abi] - event_abis: List[EventABI] = [a.abi if not isinstance(a, EventABI) else a for a in abi] + event_abis: list[EventABI] = [a.abi if not isinstance(a, EventABI) else a for a in abi] return ContractLogContainer( self.provider.network.ecosystem.decode_logs(self.logs, *event_abis) ) @@ -302,7 +302,7 @@ def decode_logs( } def get_default_log( - _log: Dict, logs: ContractLogContainer, evt_name: Optional[str] = None + _log: dict, logs: ContractLogContainer, evt_name: Optional[str] = None ) -> ContractLog: log_index = _log.get("logIndex", logs[-1].log_index + 1 if logs else 0) @@ -360,7 +360,7 @@ def get_default_log( return decoded_logs - def _decode_ds_note(self, log: Dict) -> Optional[ContractLog]: + def _decode_ds_note(self, log: dict) -> Optional[ContractLog]: # The first topic encodes the function selector selector, tail = log["topics"][0][:4], log["topics"][0][4:] if sum(tail): diff --git a/src/ape_networks/__init__.py b/src/ape_networks/__init__.py index 96a2ae5ee3..0bac2daf38 100644 --- a/src/ape_networks/__init__.py +++ b/src/ape_networks/__init__.py @@ -1,4 +1,4 @@ -from typing import Dict, List, Optional +from typing import Optional from ape import plugins from ape.api import PluginConfig @@ -25,11 +25,11 @@ class CustomNetwork(PluginConfig): default_provider: str = "node" """The HTTP request header.""" - request_header: Dict = {} + request_header: dict = {} class NetworksConfig(PluginConfig): - custom: List[CustomNetwork] = [] + custom: list[CustomNetwork] = [] @plugins.register(plugins.Config) diff --git a/src/ape_networks/_cli.py b/src/ape_networks/_cli.py index b8558ff183..bc679afa3e 100644 --- a/src/ape_networks/_cli.py +++ b/src/ape_networks/_cli.py @@ -1,5 +1,5 @@ import json -from typing import Callable, Dict +from typing import Callable import click import yaml @@ -62,7 +62,7 @@ def _list(cli_ctx, output_format, ecosystem_filter, network_filter, provider_fil ecosystems = network_data["ecosystems"] ecosystems = sorted(ecosystems, key=lambda e: e["name"]) - def make_sub_tree(data: Dict, create_tree: Callable) -> Tree: + def make_sub_tree(data: dict, create_tree: Callable) -> Tree: name = f"[bold green]{data['name']}" if "isDefault" in data and data["isDefault"]: name += default_suffix diff --git a/src/ape_node/provider.py b/src/ape_node/provider.py index f49d90791a..aecc3b375f 100644 --- a/src/ape_node/provider.py +++ b/src/ape_node/provider.py @@ -2,7 +2,7 @@ import shutil from pathlib import Path from subprocess import DEVNULL, PIPE, Popen -from typing import Dict, List, Optional, Union +from typing import Optional, Union from eth_pydantic_types import HexBytes from eth_typing import HexStr @@ -55,7 +55,7 @@ def __init__( initial_balance: Union[str, int] = to_wei(10000, "ether"), executable: Optional[str] = None, auto_disconnect: bool = True, - extra_funded_accounts: Optional[List[str]] = None, + extra_funded_accounts: Optional[list[str]] = None, hd_path: Optional[str] = DEFAULT_TEST_HD_PATH, ): executable = executable or "geth" @@ -94,7 +94,7 @@ def __init__( addresses.extend(extra_funded_accounts or []) bal_dict = {"balance": str(initial_balance)} alloc = {a: bal_dict for a in addresses} - genesis_data: Dict = { + genesis_data: dict = { "overwrite": True, "coinbase": "0x0000000000000000000000000000000000000000", "difficulty": "0x0", @@ -194,10 +194,10 @@ def wait(self, *args, **kwargs): class EthereumNetworkConfig(PluginConfig): # Make sure you are running the right networks when you try for these - mainnet: Dict = {"uri": get_random_rpc("ethereum", "mainnet")} - sepolia: Dict = {"uri": get_random_rpc("ethereum", "sepolia")} + mainnet: dict = {"uri": get_random_rpc("ethereum", "mainnet")} + sepolia: dict = {"uri": get_random_rpc("ethereum", "sepolia")} # Make sure to run via `geth --dev` (or similar) - local: Dict = {**DEFAULT_SETTINGS.copy(), "chain_id": DEFAULT_TEST_CHAIN_ID} + local: dict = {**DEFAULT_SETTINGS.copy(), "chain_id": DEFAULT_TEST_CHAIN_ID} model_config = SettingsConfigDict(extra="allow") @@ -277,7 +277,7 @@ def start(self, timeout: int = 20): # Include extra accounts to allocated funds to at genesis. extra_accounts = self.settings.ethereum.local.get("extra_funded_accounts", []) extra_accounts.extend(self.provider_settings.get("extra_funded_accounts", [])) - extra_accounts = list(set([HexBytes(a).hex().lower() for a in extra_accounts])) + extra_accounts = list({HexBytes(a).hex().lower() for a in extra_accounts}) test_config["extra_funded_accounts"] = extra_accounts process = GethDevProcess.from_uri(self.uri, self.data_dir, **test_config) @@ -347,7 +347,7 @@ def set_timestamp(self, new_timestamp: int): def mine(self, num_blocks: int = 1): pass - def build_command(self) -> List[str]: + def build_command(self) -> list[str]: return self._process.command if self._process else [] diff --git a/src/ape_node/query.py b/src/ape_node/query.py index 912a1c9204..a76becaa2b 100644 --- a/src/ape_node/query.py +++ b/src/ape_node/query.py @@ -1,5 +1,6 @@ +from collections.abc import Iterator from functools import singledispatchmethod -from typing import Iterator, Optional +from typing import Optional from ape.api import ReceiptAPI from ape.api.query import ContractCreationQuery, QueryAPI, QueryType diff --git a/src/ape_plugins/_cli.py b/src/ape_plugins/_cli.py index 596117d70b..57b2606c88 100644 --- a/src/ape_plugins/_cli.py +++ b/src/ape_plugins/_cli.py @@ -1,7 +1,7 @@ import subprocess import sys from pathlib import Path -from typing import Any, Dict, List, Tuple +from typing import Any import click from packaging.version import Version @@ -32,7 +32,7 @@ def plugins_argument(): or plugins loaded from the local config file. """ - def load_from_file(ctx, file_path: Path) -> List[PluginMetadata]: + def load_from_file(ctx, file_path: Path) -> list[PluginMetadata]: if file_path.is_dir() and (file_path / CONFIG_FILE_NAME).is_file(): file_path = file_path / CONFIG_FILE_NAME @@ -44,7 +44,7 @@ def load_from_file(ctx, file_path: Path) -> List[PluginMetadata]: ctx.obj.logger.warning(f"No plugins found at '{file_path}'.") return [] - def callback(ctx, param, value: Tuple[str]): + def callback(ctx, param, value: tuple[str]): res = [] if not value: ctx.obj.abort("You must give at least one requirement to install.") @@ -117,14 +117,14 @@ def _list(cli_ctx, to_display): @plugins_argument() @skip_confirmation_option("Don't ask for confirmation to install the plugins") @upgrade_option(help="Upgrade the plugin to the newest available version") -def install(cli_ctx, plugins: List[PluginMetadata], skip_confirmation: bool, upgrade: bool): +def install(cli_ctx, plugins: list[PluginMetadata], skip_confirmation: bool, upgrade: bool): """Install plugins""" failures_occurred = False # Track the operations until the end. This way, if validation # fails on one, we can error-out before installing anything. - install_list: List[Dict[str, Any]] = [] + install_list: list[dict[str, Any]] = [] for plugin in plugins: result = plugin._prepare_install(upgrade=upgrade, skip_confirmation=skip_confirmation) diff --git a/src/ape_pm/_cli.py b/src/ape_pm/_cli.py index 1bd9e8396f..af8ab265a0 100644 --- a/src/ape_pm/_cli.py +++ b/src/ape_pm/_cli.py @@ -1,6 +1,5 @@ import json from pathlib import Path -from typing import Tuple import click @@ -331,7 +330,7 @@ def compile(cli_ctx, name, version, force): elif not version: cli_ctx.abort("Please specify --version.") - version_opts: Tuple + version_opts: tuple if version == "local": version_opts = (version,) elif version.startswith("v"): diff --git a/src/ape_pm/compiler.py b/src/ape_pm/compiler.py index 2c4ed22472..b4ee5c339c 100644 --- a/src/ape_pm/compiler.py +++ b/src/ape_pm/compiler.py @@ -1,6 +1,7 @@ import json +from collections.abc import Sequence from pathlib import Path -from typing import List, Optional, Sequence, Set +from typing import Optional from eth_pydantic_types import HexBytes from eth_utils import is_0x_prefixed @@ -17,15 +18,15 @@ class InterfaceCompiler(CompilerAPI): def name(self) -> str: return "ethpm" - def get_versions(self, all_paths: Sequence[Path]) -> Set[str]: + def get_versions(self, all_paths: Sequence[Path]) -> set[str]: # NOTE: This bypasses the serialization of this compiler into the package manifest's # ``compilers`` field. You should not do this with a real compiler plugin. return set() def compile( self, filepaths: Sequence[Path], base_path: Optional[Path] = None - ) -> List[ContractType]: - contract_types: List[ContractType] = [] + ) -> list[ContractType]: + contract_types: list[ContractType] = [] for path in filepaths: source_path = ( get_relative_path(path, base_path) if base_path and path.is_absolute() else path diff --git a/src/ape_test/__init__.py b/src/ape_test/__init__.py index 578c8b80d8..ac7e632329 100644 --- a/src/ape_test/__init__.py +++ b/src/ape_test/__init__.py @@ -1,4 +1,4 @@ -from typing import Dict, List, NewType, Optional, Union +from typing import NewType, Optional, Union from pydantic import NonNegativeInt, field_validator @@ -23,7 +23,7 @@ class GasConfig(PluginConfig): Configuration related to test gas reports. """ - exclude: List[GasExclusion] = [] + exclude: list[GasExclusion] = [] """ Contract methods patterns to skip. Specify ``contract_name:`` and not ``method_name:`` to skip all methods in the contract. Only specify @@ -32,7 +32,7 @@ class GasConfig(PluginConfig): use ``prefix_*`` to skip all items with a certain prefix. """ - reports: List[str] = [] + reports: list[str] = [] """ Report-types to use. Currently, only supports `terminal`. """ @@ -55,7 +55,7 @@ def show(self) -> bool: """Dict is for extra report settings.""" -_ReportType = Union[bool, Dict] +_ReportType = Union[bool, dict] class CoverageReportsConfig(PluginConfig): @@ -99,7 +99,7 @@ class CoverageConfig(PluginConfig): Enable reports. """ - exclude: List[CoverageExclusion] = [] + exclude: list[CoverageExclusion] = [] """ Contract methods patterns to skip. Specify ``contract_name:`` and not ``method_name:`` to skip all methods in the contract. Only specify diff --git a/src/ape_test/_cli.py b/src/ape_test/_cli.py index f8557cc335..2c2bc160f7 100644 --- a/src/ape_test/_cli.py +++ b/src/ape_test/_cli.py @@ -2,9 +2,9 @@ import sys import threading import time +from collections.abc import Sequence from datetime import datetime, timedelta from pathlib import Path -from typing import List, Sequence import click import pytest @@ -43,7 +43,7 @@ def dispatch(self, event: events.FileSystemEvent) -> None: self.process_event(event) @cached_property - def _extensions_to_watch(self) -> List[str]: + def _extensions_to_watch(self) -> list[str]: return [".py", *self.compiler_manager.registered_compilers.keys()] def _is_path_watched(self, filepath: str) -> bool: diff --git a/src/ape_test/accounts.py b/src/ape_test/accounts.py index 788df75963..8cbbfdd8f8 100644 --- a/src/ape_test/accounts.py +++ b/src/ape_test/accounts.py @@ -1,4 +1,5 @@ -from typing import Any, Iterator, List, Optional +from collections.abc import Iterator +from typing import Any, Optional from eip712.messages import EIP712Message from eth_account import Account as EthAccount @@ -17,7 +18,7 @@ class TestAccountContainer(TestAccountContainerAPI): mnemonic: str = "" num_of_accounts: int = 0 hd_path: str = "" - _accounts: List["TestAccount"] = [] + _accounts: list["TestAccount"] = [] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -44,7 +45,7 @@ def config(self): return self.config_manager.get_config("test") @property - def _dev_accounts(self) -> List[GeneratedDevAccount]: + def _dev_accounts(self) -> list[GeneratedDevAccount]: return generate_dev_accounts( self.mnemonic, number_of_accounts=self.num_of_accounts, @@ -72,8 +73,7 @@ def accounts(self) -> Iterator["TestAccount"]: # As TestAccountManager only uses accounts property this works! if self._is_config_changed: self.init() - for account in self._accounts: - yield account + yield from self._accounts def generate_account(self) -> "TestAccountAPI": new_index = self.num_of_accounts + self.num_generated diff --git a/src/ape_test/provider.py b/src/ape_test/provider.py index d5b1bef9a4..0fcabb9730 100644 --- a/src/ape_test/provider.py +++ b/src/ape_test/provider.py @@ -2,7 +2,7 @@ from ast import literal_eval from functools import cached_property from re import Pattern -from typing import Any, Dict, Iterator, Optional, cast +from typing import Any, Iterator, Optional, cast from eth.exceptions import HeaderNotFound from eth_pydantic_types import HexBytes @@ -98,7 +98,7 @@ def disconnect(self): self._evm_backend = None self.provider_settings = {} - def update_settings(self, new_settings: Dict): + def update_settings(self, new_settings: dict): self.provider_settings = {**self.provider_settings, **new_settings} self.disconnect() self.connect() @@ -178,12 +178,12 @@ def send_call( self, txn: TransactionAPI, block_id: Optional[BlockID] = None, - state: Optional[Dict] = None, + state: Optional[dict] = None, **kwargs, ) -> HexBytes: data = txn.model_dump(mode="json", exclude_none=True) state = kwargs.pop("state_override", None) - call_kwargs: Dict = {"block_identifier": block_id, "state_override": state} + call_kwargs: dict = {"block_identifier": block_id, "state_override": state} # Remove unneeded properties data.pop("gas", None) diff --git a/tests/conftest.py b/tests/conftest.py index 53d0c3c75e..4f7c3e3e15 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,10 +4,11 @@ import sys import tempfile import time +from collections.abc import Sequence from contextlib import contextmanager from pathlib import Path from tempfile import mkdtemp -from typing import Any, Callable, Dict, Optional, Sequence +from typing import Any, Callable, Optional import pytest import yaml @@ -297,7 +298,7 @@ def eth_tester_isolation(eth_tester_provider): @pytest.fixture(scope="session") def temp_config(config): @contextmanager - def func(data: Optional[Dict] = None): + def func(data: Optional[dict] = None): data = data or {} with tempfile.TemporaryDirectory() as temp_dir_str: temp_dir = Path(temp_dir_str).resolve() @@ -353,7 +354,7 @@ def _temp_keyfile_account(base_path: Path, alias: str, keyparams, sender): return _temp_keyfile_account -def _make_keyfile_account(base_path: Path, alias: str, params: Dict, funder): +def _make_keyfile_account(base_path: Path, alias: str, params: dict, funder): test_keyfile_path = base_path / f"{alias}.json" if test_keyfile_path.is_file(): @@ -538,7 +539,7 @@ def output(self) -> str: CUSTOM_BLOCK_TIME = 123 -def _make_net(name: str, chain_id: int, **kwargs) -> Dict: +def _make_net(name: str, chain_id: int, **kwargs) -> dict: return {"name": name, "chain_id": chain_id, "ecosystem": "ethereum", **kwargs}