Skip to content

Commit

Permalink
First stab on multiple node broadcast
Browse files Browse the repository at this point in the history
  • Loading branch information
miohtama committed Oct 17, 2023
1 parent 7a55cfd commit 980820e
Show file tree
Hide file tree
Showing 4 changed files with 307 additions and 16 deletions.
223 changes: 215 additions & 8 deletions eth_defi/confirmation.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
"""Transaction broadcasting, block confirmation and completion monitoring.
- Wait for multiple transactions to be confirmed and read back the results from the blockchain
- The safest way to get transactions out is to use :py:func:`wait_and_broadcast_multiple_nodes`
"""

import datetime
import logging
import time
from typing import Dict, List, Set, Union, cast
from typing import Dict, List, Set, Union, cast, Collection, TypeAlias

from eth_account.datastructures import SignedTransaction
from eth_typing import HexStr

from eth_defi.provider.named import get_provider_name
from hexbytes import HexBytes
from web3 import Web3
from web3.exceptions import TransactionNotFound

from eth_defi.hotwallet import SignedTransactionWithNonce
from eth_defi.tx import decode_signed_transaction
from eth_defi.provider.fallback import FallbackProvider

from eth_defi.provider.fallback import FallbackProvider, get_fallback_provider
from web3.providers import BaseProvider

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -152,16 +157,18 @@ def wait_transactions_to_complete(
unconfirmed_tx_strs = ", ".join([tx_hash.hex() for tx_hash in unconfirmed_txs])
raise ConfirmationTimedOut(f"Transaction confirmation failed. Started: {started_at}, timed out after {max_timeout} ({max_timeout.total_seconds()}s). Poll delay: {poll_delay.total_seconds()}s. Still unconfirmed: {unconfirmed_tx_strs}")

# Check if it time to try a better node provider
if isinstance(web3.provider, FallbackProvider):
provider = cast(FallbackProvider, web3.provider)
if datetime.datetime.utcnow() >= next_node_switch:
logger.info(
if datetime.datetime.utcnow() >= next_node_switch:
# Check if it time to try a better node provider
if isinstance(web3.provider, FallbackProvider):
provider = cast(FallbackProvider, web3.provider)
logger.warning(
"Timeout %s reached with this node provider. Trying with alternative node provider.",
node_switch_timeout,
)
provider.switch_provider()
next_node_switch = datetime.datetime.utcnow() + node_switch_timeout
else:
logger.warning("TX confirmation takes long time. No alternative node available: %s", web3.provider)

return receipts_received

Expand Down Expand Up @@ -296,3 +303,203 @@ def broadcast_and_wait_transactions_to_complete(
raise RuntimeError(f"Transaction {tx_hash} failed {receipt}")

return receipts


def _broadcast_multiple_nodes(providers: Collection[BaseProvider], signed_tx: SignedTransaction):
"""Attempt to broadcast a transaction through multiple provideres."""

for p in providers:

nonce = getattr(signed_tx, "nonce", None)
address = getattr(signed_tx, "address", None)

name = get_provider_name(p)
logger.info(
"Broadcasting %s through %s",
signed_tx.hash,
name
)

# Does not use any middleware
web3 = Web3(p)
try:
web3.eth.send_raw_transaction(signed_tx.rawTransaction)
except ValueError as e:
resp_data: dict = e.args[0]

# When we rebroadcast we are getting nonce too low errors,
# both for too high and too low nonces
if resp_data["message"] == "nonce too low":
continue
except Exception as e:
logger.warning("Provider %s failed with tx %s from address %s, nonce %s",
name,
signed_tx.hash.hex(),
address,
nonce
)
logger.exception(e)


# Support different raw tx formats
TxType = Union[SignedTransaction, SignedTransactionWithNonce]


def wait_and_broadcast_multiple_nodes(
web3: Web3,
txs: Collection[TxType],
confirmation_block_count: int = 0,
max_timeout=datetime.timedelta(minutes=5),
poll_delay=datetime.timedelta(seconds=1),
node_switch_timeout=datetime.timedelta(minutes=3),
) -> Dict[HexBytes, dict]:
"""Try to broadcast transactions through multiple nodes.
- Broadcast transaction through all nodes
- Wait to confirm
- If ``node_switch_timeout`` is reached, try to confirm using an alternative node
:param web3:
Web3 instance with :py:class:`eth_defi.provider.fallback.FallbackProvider`
configured as its RPC provider.
:param txs:
List of transaction to broadcast.
:param confirmation_block_count:
How many blocks wait for the transaction receipt to settle.
Set to zero to return as soon as we see the first transaction receipt.
:param node_switch_timeout:
Switch to alternative fallback node provider
every time we reach this limit.
Sometimes our node is malfunctioning (LlamaNodes, Ankr)
and does not report transactions timely. Try with another node.
See :py:class:`eth_defi.provider.fallback.FallbackProvider` for details.
:return:
Map of transaction hashes -> receipt
:raise ConfirmationTimedOut:
If we cannot get transactions out
"""

assert isinstance(poll_delay, datetime.timedelta)
assert isinstance(max_timeout, datetime.timedelta)
assert isinstance(confirmation_block_count, int)

if web3.eth.chain_id == 61:
assert confirmation_block_count == 0, "Ethereum Tester chain does not progress itself, so we cannot wait"

for tx in txs:
assert getattr(tx, "hash", None), f"Does not look like compatible TxType: {tx.__class__}: {tx}"

provider = get_fallback_provider(web3)
providers = provider.providers

logger.info(
"Broadcasting %d transactions using %s to confirm in %d blocks, timeout is %s",
len(txs),
", ".join([get_provider_name(p) for p in providers]),
confirmation_block_count,
max_timeout,
)

# Double check nonces before letting txs thru
used_nonces = set()
for tx in txs:
nonce = getattr(tx, "nonce", None)
if nonce is not None:
assert nonce not in used_nonces, f"Nonce used twice: {nonce}"
used_nonces.add(nonce)

started_at = datetime.datetime.utcnow()

receipts_received = {}

unconfirmed_txs: Set[HexBytes] = {tx.hash for tx in txs}

# When we switch to level to verbose to be more
# concerned with our debug logging
verbose_timeout = max_timeout - datetime.timedelta(minutes=1)

next_node_switch = started_at + node_switch_timeout

# Initial broadcast of txs
for tx in txs:
_broadcast_multiple_nodes(providers, tx)

while len(unconfirmed_txs) > 0:
# Transaction hashes that receive confirmation on this round
confirmation_received = set()

unconfirmed_tx_hashes = ", ".join(tx_hash.hex() for tx_hash in unconfirmed_txs)
logger.debug("Starting confirmation cycle, unconfirmed txs are %s", unconfirmed_tx_hashes)

# Bump our verbosiveness levels for the last minutes of wait
if datetime.datetime.utcnow() > started_at + verbose_timeout:
tx_log_level = logging.WARNING
else:
tx_log_level = logging.DEBUG

for tx_hash in unconfirmed_txs:

try:
receipt = web3.eth.get_transaction_receipt(tx_hash)
except TransactionNotFound as e:
# BNB Chain get does this instead of returning None
logger.debug("Transaction not found yet: %s", e)
receipt = None

if receipt:
tx_confirmations = web3.eth.block_number - receipt["blockNumber"]
if tx_confirmations >= confirmation_block_count:
logger.log(
tx_log_level,
"Confirmed tx %s with %d confirmations",
tx_hash.hex(),
tx_confirmations,
)
confirmation_received.add(tx_hash)
receipts_received[tx_hash] = receipt
else:
logger.log(tx_log_level, "Still waiting more confirmations. Tx %s with %d confirmations, %d needed", tx_hash.hex(), tx_confirmations, confirmation_block_count)

# Remove confirmed txs from the working set
unconfirmed_txs -= confirmation_received

if unconfirmed_txs:
time.sleep(poll_delay.total_seconds())

if datetime.datetime.utcnow() > started_at + max_timeout:
for tx_hash in unconfirmed_txs:
try:
tx_data = web3.eth.get_transaction(tx_hash)
logger.error("Data for transaction %s was %s", tx_hash.hex(), tx_data)
except TransactionNotFound as e:
# Happens on LlamaNodes - we have broadcasted the transaction
# but its nodes do not see it yet
logger.error("Node missing transaction broadcast %s", tx_hash.hex())
logger.exception(e)

unconfirmed_tx_strs = ", ".join([tx_hash.hex() for tx_hash in unconfirmed_txs])
raise ConfirmationTimedOut(f"Transaction confirmation failed. Started: {started_at}, timed out after {max_timeout} ({max_timeout.total_seconds()}s). Poll delay: {poll_delay.total_seconds()}s. Still unconfirmed: {unconfirmed_tx_strs}")

if datetime.datetime.utcnow() >= next_node_switch:
# Check if it time to try a better node provider
logger.warning(
"Timeout %s reached with this node provider. Trying confirm tx success with an alternative node provider: %s.",
node_switch_timeout,
provider,
)
provider.switch_provider()
next_node_switch = datetime.datetime.utcnow() + node_switch_timeout

# Rebroadcast txs again if we suspect a broadcast failed
for tx in txs:
_broadcast_multiple_nodes(providers, tx)

return receipts_received
2 changes: 2 additions & 0 deletions eth_defi/hotwallet.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class SignedTransactionWithNonce(NamedTuple):
s: int
v: int
nonce: int
address: str

#: Undecoded transaction data as a dict.
#:
Expand Down Expand Up @@ -140,6 +141,7 @@ def sign_transaction_with_new_nonce(self, tx: dict) -> SignedTransactionWithNonc
s=_signed.s,
nonce=tx["nonce"],
source=tx,
address=self.address,
)
return signed

Expand Down
53 changes: 47 additions & 6 deletions eth_defi/provider/fallback.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
import enum
import time
from collections import defaultdict, Counter
from typing import List, Any
from typing import List, Any, cast
import logging

from web3 import Web3
from web3.types import RPCEndpoint, RPCResponse

from eth_defi.middleware import is_retryable_http_exception, DEFAULT_RETRYABLE_EXCEPTIONS, DEFAULT_RETRYABLE_HTTP_STATUS_CODES, DEFAULT_RETRYABLE_RPC_ERROR_CODES, ProbablyNodeHasNoBlock
Expand Down Expand Up @@ -139,7 +140,16 @@ def endpoint_uri(self):

def switch_provider(self):
"""Switch to next available provider."""
provider = self.get_active_provider()
old_provider_name = get_provider_name(provider)
self.currently_active_provider = (self.currently_active_provider + 1) % len(self.providers)
new_provider_name = get_provider_name(self.get_active_provider())
logger.log(
self.switchover_noisiness,
"Switched RPC providers %s -> %s\n",
old_provider_name,
new_provider_name
)

def get_active_provider(self) -> NamedProvider:
"""Get currently active provider.
Expand Down Expand Up @@ -181,28 +191,38 @@ def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse:
return resp_data

except Exception as e:
old_provider_name = get_provider_name(provider)

if is_retryable_http_exception(
e,
retryable_rpc_error_codes=self.retryable_rpc_error_codes,
retryable_status_codes=self.retryable_status_codes,
retryable_exceptions=self.retryable_exceptions,
):
self.switch_provider()
new_provider_name = get_provider_name(self.get_active_provider())

if i < self.retries:
# Black messes up string new lines here
# See https://github.com/psf/black/issues/1837
logger.log(self.switchover_noisiness, "Encountered JSON-RPC retryable error %s when calling method:\n" "%s(%s)\n" "Switching providers %s -> %s\n" "Retrying in %f seconds, retry #%d / %d", e, method, params, old_provider_name, new_provider_name, current_sleep, i, self.retries)
logger.log(
self.switchover_noisiness,
"Encountered JSON-RPC retryable error %s when calling method:\n"
"%s(%s)\n "
"Retrying in %f seconds, retry #%d / %d",
e,
method,
params,
current_sleep,
i,
self.retries
)
time.sleep(current_sleep)
current_sleep *= self.backoff
self.retry_count += 1
self.api_retry_counts[self.currently_active_provider][method] += 1
continue
else:
raise # Out of retries
logger.info("Will not retry on %s, method %s, as not a retryable exception %s: %s", old_provider_name, method, e.__class__, e)
logger.info("Will not retry, method %s, as not a retryable exception %s: %s", method, e.__class__, e)
raise # Not retryable exception

raise AssertionError("Should never be reached")
Expand Down Expand Up @@ -258,4 +278,25 @@ def _check_faulty_rpc_response(
# current_sleep = max(self.state_missing_switch_over_delay, current_sleep)
raise ProbablyNodeHasNoBlock(f"Node did not have data for block {block_identifier} when calling {method}")

#

def get_fallback_provider(web3: Web3) -> FallbackProvider:
"""Get the fallback provider of a Wen3 instance.
Can be nested in :py:class:`eth_defi.provider.mev_block.MEVBlockerProvider`.
:param web3:
Web3 instance
:raise AssertionError:
If there is no fallback provider available
"""
provider = web3.provider
if isinstance(provider, FallbackProvider):
return cast(FallbackProvider, provider)

# MEVBlockerProvider
call_provider = getattr(provider, "call_provider", None)
if call_provider:
return cast(FallbackProvider, call_provider)

raise AssertionError(f"Does not know how fallback provider is configured: {[provider]}")
Loading

0 comments on commit 980820e

Please sign in to comment.