From 980820ed67afefc06226445a3a60a0e29cbbb52c Mon Sep 17 00:00:00 2001 From: Mikko Ohtamaa Date: Tue, 17 Oct 2023 10:51:03 +0200 Subject: [PATCH] First stab on multiple node broadcast --- eth_defi/confirmation.py | 223 +++++++++++++++++++++++++++- eth_defi/hotwallet.py | 2 + eth_defi/provider/fallback.py | 53 ++++++- tests/rpc/test_fallback_provider.py | 45 +++++- 4 files changed, 307 insertions(+), 16 deletions(-) diff --git a/eth_defi/confirmation.py b/eth_defi/confirmation.py index b9aea23b..6b40617b 100644 --- a/eth_defi/confirmation.py +++ b/eth_defi/confirmation.py @@ -1,22 +1,27 @@ """Transaction broadcasting, block confirmation and completion monitoring. - Wait for multiple transactions to be confirmed and read back the results from the blockchain + +- The safest way to get transactions out is to use :py:func:`wait_and_broadcast_multiple_nodes` """ import datetime import logging import time -from typing import Dict, List, Set, Union, cast +from typing import Dict, List, Set, Union, cast, Collection, TypeAlias from eth_account.datastructures import SignedTransaction +from eth_typing import HexStr + +from eth_defi.provider.named import get_provider_name from hexbytes import HexBytes from web3 import Web3 from web3.exceptions import TransactionNotFound from eth_defi.hotwallet import SignedTransactionWithNonce from eth_defi.tx import decode_signed_transaction -from eth_defi.provider.fallback import FallbackProvider - +from eth_defi.provider.fallback import FallbackProvider, get_fallback_provider +from web3.providers import BaseProvider logger = logging.getLogger(__name__) @@ -152,16 +157,18 @@ def wait_transactions_to_complete( unconfirmed_tx_strs = ", ".join([tx_hash.hex() for tx_hash in unconfirmed_txs]) raise ConfirmationTimedOut(f"Transaction confirmation failed. Started: {started_at}, timed out after {max_timeout} ({max_timeout.total_seconds()}s). Poll delay: {poll_delay.total_seconds()}s. Still unconfirmed: {unconfirmed_tx_strs}") - # Check if it time to try a better node provider - if isinstance(web3.provider, FallbackProvider): - provider = cast(FallbackProvider, web3.provider) - if datetime.datetime.utcnow() >= next_node_switch: - logger.info( + if datetime.datetime.utcnow() >= next_node_switch: + # Check if it time to try a better node provider + if isinstance(web3.provider, FallbackProvider): + provider = cast(FallbackProvider, web3.provider) + logger.warning( "Timeout %s reached with this node provider. Trying with alternative node provider.", node_switch_timeout, ) provider.switch_provider() next_node_switch = datetime.datetime.utcnow() + node_switch_timeout + else: + logger.warning("TX confirmation takes long time. No alternative node available: %s", web3.provider) return receipts_received @@ -296,3 +303,203 @@ def broadcast_and_wait_transactions_to_complete( raise RuntimeError(f"Transaction {tx_hash} failed {receipt}") return receipts + + +def _broadcast_multiple_nodes(providers: Collection[BaseProvider], signed_tx: SignedTransaction): + """Attempt to broadcast a transaction through multiple provideres.""" + + for p in providers: + + nonce = getattr(signed_tx, "nonce", None) + address = getattr(signed_tx, "address", None) + + name = get_provider_name(p) + logger.info( + "Broadcasting %s through %s", + signed_tx.hash, + name + ) + + # Does not use any middleware + web3 = Web3(p) + try: + web3.eth.send_raw_transaction(signed_tx.rawTransaction) + except ValueError as e: + resp_data: dict = e.args[0] + + # When we rebroadcast we are getting nonce too low errors, + # both for too high and too low nonces + if resp_data["message"] == "nonce too low": + continue + except Exception as e: + logger.warning("Provider %s failed with tx %s from address %s, nonce %s", + name, + signed_tx.hash.hex(), + address, + nonce + ) + logger.exception(e) + + +# Support different raw tx formats +TxType = Union[SignedTransaction, SignedTransactionWithNonce] + + +def wait_and_broadcast_multiple_nodes( + web3: Web3, + txs: Collection[TxType], + confirmation_block_count: int = 0, + max_timeout=datetime.timedelta(minutes=5), + poll_delay=datetime.timedelta(seconds=1), + node_switch_timeout=datetime.timedelta(minutes=3), +) -> Dict[HexBytes, dict]: + """Try to broadcast transactions through multiple nodes. + + - Broadcast transaction through all nodes + - Wait to confirm + - If ``node_switch_timeout`` is reached, try to confirm using an alternative node + + :param web3: + Web3 instance with :py:class:`eth_defi.provider.fallback.FallbackProvider` + configured as its RPC provider. + + :param txs: + List of transaction to broadcast. + + :param confirmation_block_count: + How many blocks wait for the transaction receipt to settle. + Set to zero to return as soon as we see the first transaction receipt. + + :param node_switch_timeout: + Switch to alternative fallback node provider + every time we reach this limit. + + Sometimes our node is malfunctioning (LlamaNodes, Ankr) + and does not report transactions timely. Try with another node. + + See :py:class:`eth_defi.provider.fallback.FallbackProvider` for details. + + :return: + Map of transaction hashes -> receipt + + :raise ConfirmationTimedOut: + If we cannot get transactions out + + """ + + assert isinstance(poll_delay, datetime.timedelta) + assert isinstance(max_timeout, datetime.timedelta) + assert isinstance(confirmation_block_count, int) + + if web3.eth.chain_id == 61: + assert confirmation_block_count == 0, "Ethereum Tester chain does not progress itself, so we cannot wait" + + for tx in txs: + assert getattr(tx, "hash", None), f"Does not look like compatible TxType: {tx.__class__}: {tx}" + + provider = get_fallback_provider(web3) + providers = provider.providers + + logger.info( + "Broadcasting %d transactions using %s to confirm in %d blocks, timeout is %s", + len(txs), + ", ".join([get_provider_name(p) for p in providers]), + confirmation_block_count, + max_timeout, + ) + + # Double check nonces before letting txs thru + used_nonces = set() + for tx in txs: + nonce = getattr(tx, "nonce", None) + if nonce is not None: + assert nonce not in used_nonces, f"Nonce used twice: {nonce}" + used_nonces.add(nonce) + + started_at = datetime.datetime.utcnow() + + receipts_received = {} + + unconfirmed_txs: Set[HexBytes] = {tx.hash for tx in txs} + + # When we switch to level to verbose to be more + # concerned with our debug logging + verbose_timeout = max_timeout - datetime.timedelta(minutes=1) + + next_node_switch = started_at + node_switch_timeout + + # Initial broadcast of txs + for tx in txs: + _broadcast_multiple_nodes(providers, tx) + + while len(unconfirmed_txs) > 0: + # Transaction hashes that receive confirmation on this round + confirmation_received = set() + + unconfirmed_tx_hashes = ", ".join(tx_hash.hex() for tx_hash in unconfirmed_txs) + logger.debug("Starting confirmation cycle, unconfirmed txs are %s", unconfirmed_tx_hashes) + + # Bump our verbosiveness levels for the last minutes of wait + if datetime.datetime.utcnow() > started_at + verbose_timeout: + tx_log_level = logging.WARNING + else: + tx_log_level = logging.DEBUG + + for tx_hash in unconfirmed_txs: + + try: + receipt = web3.eth.get_transaction_receipt(tx_hash) + except TransactionNotFound as e: + # BNB Chain get does this instead of returning None + logger.debug("Transaction not found yet: %s", e) + receipt = None + + if receipt: + tx_confirmations = web3.eth.block_number - receipt["blockNumber"] + if tx_confirmations >= confirmation_block_count: + logger.log( + tx_log_level, + "Confirmed tx %s with %d confirmations", + tx_hash.hex(), + tx_confirmations, + ) + confirmation_received.add(tx_hash) + receipts_received[tx_hash] = receipt + else: + logger.log(tx_log_level, "Still waiting more confirmations. Tx %s with %d confirmations, %d needed", tx_hash.hex(), tx_confirmations, confirmation_block_count) + + # Remove confirmed txs from the working set + unconfirmed_txs -= confirmation_received + + if unconfirmed_txs: + time.sleep(poll_delay.total_seconds()) + + if datetime.datetime.utcnow() > started_at + max_timeout: + for tx_hash in unconfirmed_txs: + try: + tx_data = web3.eth.get_transaction(tx_hash) + logger.error("Data for transaction %s was %s", tx_hash.hex(), tx_data) + except TransactionNotFound as e: + # Happens on LlamaNodes - we have broadcasted the transaction + # but its nodes do not see it yet + logger.error("Node missing transaction broadcast %s", tx_hash.hex()) + logger.exception(e) + + unconfirmed_tx_strs = ", ".join([tx_hash.hex() for tx_hash in unconfirmed_txs]) + raise ConfirmationTimedOut(f"Transaction confirmation failed. Started: {started_at}, timed out after {max_timeout} ({max_timeout.total_seconds()}s). Poll delay: {poll_delay.total_seconds()}s. Still unconfirmed: {unconfirmed_tx_strs}") + + if datetime.datetime.utcnow() >= next_node_switch: + # Check if it time to try a better node provider + logger.warning( + "Timeout %s reached with this node provider. Trying confirm tx success with an alternative node provider: %s.", + node_switch_timeout, + provider, + ) + provider.switch_provider() + next_node_switch = datetime.datetime.utcnow() + node_switch_timeout + + # Rebroadcast txs again if we suspect a broadcast failed + for tx in txs: + _broadcast_multiple_nodes(providers, tx) + + return receipts_received diff --git a/eth_defi/hotwallet.py b/eth_defi/hotwallet.py index 369552d5..1c71c527 100644 --- a/eth_defi/hotwallet.py +++ b/eth_defi/hotwallet.py @@ -39,6 +39,7 @@ class SignedTransactionWithNonce(NamedTuple): s: int v: int nonce: int + address: str #: Undecoded transaction data as a dict. #: @@ -140,6 +141,7 @@ def sign_transaction_with_new_nonce(self, tx: dict) -> SignedTransactionWithNonc s=_signed.s, nonce=tx["nonce"], source=tx, + address=self.address, ) return signed diff --git a/eth_defi/provider/fallback.py b/eth_defi/provider/fallback.py index e56c1ac6..87b4c77e 100644 --- a/eth_defi/provider/fallback.py +++ b/eth_defi/provider/fallback.py @@ -5,9 +5,10 @@ import enum import time from collections import defaultdict, Counter -from typing import List, Any +from typing import List, Any, cast import logging +from web3 import Web3 from web3.types import RPCEndpoint, RPCResponse from eth_defi.middleware import is_retryable_http_exception, DEFAULT_RETRYABLE_EXCEPTIONS, DEFAULT_RETRYABLE_HTTP_STATUS_CODES, DEFAULT_RETRYABLE_RPC_ERROR_CODES, ProbablyNodeHasNoBlock @@ -139,7 +140,16 @@ def endpoint_uri(self): def switch_provider(self): """Switch to next available provider.""" + provider = self.get_active_provider() + old_provider_name = get_provider_name(provider) self.currently_active_provider = (self.currently_active_provider + 1) % len(self.providers) + new_provider_name = get_provider_name(self.get_active_provider()) + logger.log( + self.switchover_noisiness, + "Switched RPC providers %s -> %s\n", + old_provider_name, + new_provider_name + ) def get_active_provider(self) -> NamedProvider: """Get currently active provider. @@ -181,7 +191,7 @@ def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse: return resp_data except Exception as e: - old_provider_name = get_provider_name(provider) + if is_retryable_http_exception( e, retryable_rpc_error_codes=self.retryable_rpc_error_codes, @@ -189,12 +199,22 @@ def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse: retryable_exceptions=self.retryable_exceptions, ): self.switch_provider() - new_provider_name = get_provider_name(self.get_active_provider()) if i < self.retries: # Black messes up string new lines here # See https://github.com/psf/black/issues/1837 - logger.log(self.switchover_noisiness, "Encountered JSON-RPC retryable error %s when calling method:\n" "%s(%s)\n" "Switching providers %s -> %s\n" "Retrying in %f seconds, retry #%d / %d", e, method, params, old_provider_name, new_provider_name, current_sleep, i, self.retries) + logger.log( + self.switchover_noisiness, + "Encountered JSON-RPC retryable error %s when calling method:\n" + "%s(%s)\n " + "Retrying in %f seconds, retry #%d / %d", + e, + method, + params, + current_sleep, + i, + self.retries + ) time.sleep(current_sleep) current_sleep *= self.backoff self.retry_count += 1 @@ -202,7 +222,7 @@ def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse: continue else: raise # Out of retries - logger.info("Will not retry on %s, method %s, as not a retryable exception %s: %s", old_provider_name, method, e.__class__, e) + logger.info("Will not retry, method %s, as not a retryable exception %s: %s", method, e.__class__, e) raise # Not retryable exception raise AssertionError("Should never be reached") @@ -258,4 +278,25 @@ def _check_faulty_rpc_response( # current_sleep = max(self.state_missing_switch_over_delay, current_sleep) raise ProbablyNodeHasNoBlock(f"Node did not have data for block {block_identifier} when calling {method}") - # + +def get_fallback_provider(web3: Web3) -> FallbackProvider: + """Get the fallback provider of a Wen3 instance. + + Can be nested in :py:class:`eth_defi.provider.mev_block.MEVBlockerProvider`. + + :param web3: + Web3 instance + + :raise AssertionError: + If there is no fallback provider available + """ + provider = web3.provider + if isinstance(provider, FallbackProvider): + return cast(FallbackProvider, provider) + + # MEVBlockerProvider + call_provider = getattr(provider, "call_provider", None) + if call_provider: + return cast(FallbackProvider, call_provider) + + raise AssertionError(f"Does not know how fallback provider is configured: {[provider]}") \ No newline at end of file diff --git a/tests/rpc/test_fallback_provider.py b/tests/rpc/test_fallback_provider.py index 068c00a9..1833fef5 100644 --- a/tests/rpc/test_fallback_provider.py +++ b/tests/rpc/test_fallback_provider.py @@ -1,14 +1,17 @@ """Test JSON-RPC provider fallback mechanism.""" +import datetime import os from unittest.mock import patch, DEFAULT import pytest import requests from eth_account import Account + +from eth_defi.confirmation import wait_and_broadcast_multiple_nodes from eth_defi.provider.broken_provider import get_default_block_tip_latency from web3 import HTTPProvider, Web3 -from eth_defi.anvil import launch_anvil, AnvilLaunch +from eth_defi.provider.anvil import launch_anvil, AnvilLaunch from eth_defi.gas import node_default_gas_price_strategy from eth_defi.hotwallet import HotWallet from eth_defi.middleware import ProbablyNodeHasNoBlock @@ -177,7 +180,7 @@ def test_fallback_nonce_too_low(web3, deployer: str): @pytest.mark.skipif( os.environ.get("JSON_RPC_POLYGON") is None, - reason="Set JSON_RPC_POLYGON environment variable to a privately configured Polygon node", + reason="Set JSON_RPC_POLYGON environment variable to a Polygon node", ) def test_eth_call_not_having_block(fallback_provider: FallbackProvider, provider_1): """What happens if you ask data from non-existing block.""" @@ -205,3 +208,41 @@ def test_eth_call_not_having_block(fallback_provider: FallbackProvider, provider usdc.contract.functions.balanceOf(ZERO_ADDRESS).call(block_identifier=bad_block) assert fallback_provider.api_retry_counts[0]["eth_call"] == 3 # 5 attempts, 3 retries, the last retry does not count + + +def test_broadcast_and_wait_multiple(web3: Web3, deployer: str): + """Broadcast transactions through multiple nodes. + + In this case, we test by just having multiple fallback providers pointing to the same node. + """ + + web3.eth.set_gas_price_strategy(node_default_gas_price_strategy) + + user = Account.create() + hot_wallet = HotWallet(user) + + # Fill in user wallet + tx1_hash = web3.eth.send_transaction({"from": deployer, "to": user.address, "value": 5 * 10**18}) + assert_transaction_success_with_explanation(web3, tx1_hash) + + hot_wallet.sync_nonce(web3) + + # First send a transaction with a correct nonce + tx2 = {"chainId": web3.eth.chain_id, "from": user.address, "to": deployer, "value": 1 * 10**18, "gas": 30_000} + HotWallet.fill_in_gas_price(web3, tx2) + signed_tx2 = hot_wallet.sign_transaction_with_new_nonce(tx2) + + tx3 = {"chainId": web3.eth.chain_id, "from": user.address, "to": deployer, "value": 1 * 10**18, "gas": 30_000} + HotWallet.fill_in_gas_price(web3, tx3) + signed_tx3 = hot_wallet.sign_transaction_with_new_nonce(tx3) + + # Use low timeouts so this should stress out the logic + receipt_map = wait_and_broadcast_multiple_nodes( + web3, + [signed_tx2, signed_tx3], + max_timeout=datetime.timedelta(seconds=10), + node_switch_timeout=datetime.timedelta(seconds=1), + ) + + assert signed_tx2.hash in receipt_map + assert signed_tx3.hash in receipt_map \ No newline at end of file