Source code for eth_defi.middleware

"""Web3 middleware.

Most are for dealing with JSON-RPC unreliability issues with retries.

- Taken from exception_retry_request.py from Web3.py

- Modified to support sleep and throttling

- Logs warnings to Python logging subsystem in the case there is need to retry

- See also :py:mod:`eth_defi.provider.broken_provider`.
"""

import logging
import time
from typing import (
    Any,
    Callable,
    Collection,
    Counter,
    Optional,
    Tuple,
    Type,
    TypeAlias,
    Union,
)

from eth_utils.toolz import assoc
from requests.exceptions import (
    ChunkedEncodingError,
    ConnectionError,
    HTTPError,
    Timeout,
    TooManyRedirects,
)
from web3 import Web3
from web3._utils.transactions import get_buffered_gas_estimate
from web3.exceptions import BlockNotFound, ContractLogicError
from web3.middleware.exception_retry_request import check_if_retry_on_failure
from web3.types import Middleware, RPCEndpoint, RPCResponse

logger = logging.getLogger(__name__)


#: List of Web3 exceptions we know we should retry after some timeout
#:
#: For ``BlockNotFound`` see also :py:mod:`eth_defi.rpc.broken_provider`.
#:
DEFAULT_RETRYABLE_EXCEPTIONS: Tuple[BaseException] = (
    ConnectionError,
    HTTPError,
    Timeout,
    TooManyRedirects,
    # This happens when you ask web3.eth.block_number from Ankr,
    # but if you use it as `block_identifier` in the following call
    # it gives BlockNotFound. This is a problem with Ankr itself,
    # but we'll add it here just to work around this crappy provider
    # by default.
    BlockNotFound,
    # Spit out by LlamaNodes.
    #
    # Their server give invalid HTTP reply.
    #
    # requests.exceptions.ChunkedEncodingError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
    #
    ChunkedEncodingError,

)

#: List of HTTP status codes we know we might want to retry after a timeout
#:
#: Taken from https://stackoverflow.com/a/72302017/315168
DEFAULT_RETRYABLE_HTTP_STATUS_CODES = (
    429,
    500,
    502,
    503,
    504,
    525,  # Returned by Alchemy - SSL handshake failed - cause unknown, internal Alchemy failure suspected https://http.dev/525
    520,  # Returned by Alchemy - CloudFlare: Unknown error
    410,  # happens on dRPC: requests.exceptions.HTTPError: 410 Client Error: Gone for url: https://lb.drpc.org/ogrpc?network=avalanche&dkey=xxx

    # dRPC error
    # requests.exceptions.HTTPError: 403 Client Error: Forbidden for url: https://lb.drpc.org/ogrpc?network=polygon&dkey=x/
    403,

)

#: List of ValueError status codes we know we might want to retry after a timeout
#:
#: This is a self-managed list curated by pain.
#:
#: JSON-RPC error might be mapped to :py:class:`ValueError`
#: if nothing else is available.
#:
#: Example from Pokt Network:
#:
#: `ValueError: {'message': 'Internal JSON-RPC error.', 'code': -32603}`
#:
#: We assume this is a broken RPC node and Pokt will reroute the
#: the next retried request to some other node.
#:
#: See GoEthereum error codes https://github.com/ethereum/go-ethereum/blob/master/rpc/errors.go
#:
DEFAULT_RETRYABLE_RPC_ERROR_CODES = (
    # The node provider has corrupted database or something, GoEthereum
    # cannot handle gracefully.
    # ValueError: {'message': 'Internal JSON-RPC error.', 'code': -32603}
    -32603,
    # ValueError: {'code': -32000, 'message': 'nonce too low'}.
    # Might happen when we are broadcasting multiple transactions through multiple RPC providers
    # using eth_sendRawTransaction
    # One provider has not yet seen a transaction broadcast through the other provider.
    # CRAP! -32000 is also Execution reverted on Alchemy.
    # -32000,
    # ValueError: {'code': -32003, 'message': 'nonce too low'}.
    # Anvil variant for nonce too low, same as above
    -32003,
    # Some error we are getting from LlamaNodes eth_getLogs RPC that we do not know what it is all about
    # {'code': -32043, 'message': 'Requested data is not available'}
    -32043,
    # eth_getLogs size limit exceeded for a provider
    # eth_getLogs disabled on some providers
    # https://github.com/bnb-chain/bsc/issues/1215
    # {'code': -32005, 'message': 'limit exceeded'}
    -32005,
    # Some JSON-RPC provider is buying nodes from allondes.com have have screwed it up
    # ValueError: {'code': -32701, 'message': 'Please specify address in your request or, to remove restrictions, order a dedicated full node here: https://www.allnodes.com/bnb/host'}
    -32701,

    # dRPC failure
    # ValueError: {'message': 'There are not enough CUPs left to cover the CU required for current request.', 'code': 42903}g
    42903,
)


#: Because Ethreum JSON-RPC API is horribly broken,
#: we also need to check for error messages besides error codes.
#:
#: See :py:data:`DEFAULT_RETRYABLE_RPC_ERROR_CODES`.
#:
DEFAULT_RETRYABLE_RPC_ERROR_MESSAGES = {
    # When broadcasting batch transactions, the RPC provider
    # has a load balancer that is not internally coherent
    "nonce too low",
    # Some random load balancer error?
    # https://github.com/MetaMask/metamask-extension/issues/7234
    "header not found",
    # Error from Alchemy
    # ValueError: {'code': -32000, 'message': 'execution aborted (timeout = 5s)'}
    "execution aborted (timeout = 5s)",
}

#: Ethereum JSON-RPC calls where the value never changes
#:
STATIC_CALL_LIST = ("eth_chainId",)


[docs]class ProbablyNodeHasNoBlock(Exception): """A special exception raised when we suspect JSON-RPC node does not yet have data for a block we asked. - Calling a contract on a block before contract was deployed - Calling a contract on a block where the node does not have the block data yet See :py:mod:`eth_defi.provider.fallback` for details. """
[docs]def is_retryable_http_exception( exc: Exception, retryable_exceptions: Tuple[BaseException] = DEFAULT_RETRYABLE_EXCEPTIONS, retryable_status_codes: Collection[int] = DEFAULT_RETRYABLE_HTTP_STATUS_CODES, retryable_rpc_error_codes: Collection[int] = DEFAULT_RETRYABLE_RPC_ERROR_CODES, retryable_rpc_error_messages: Collection[str] = DEFAULT_RETRYABLE_RPC_ERROR_MESSAGES, method: str | None = None, params: list | None = None, ): """Helper to check retryable errors from JSON-RPC calls. Retryable reasons are connection timeouts, API throttling and such. We support various kind of exceptions and HTTP status codes we know we can try. :param exc: Exception raised by :py:mod:`requests` or Web3 machinery. :param retryable_exceptions: Exception raised by :py:mod:`requests` or Web3 machinery. :param retryable_status_codes: HTTP status codes we can retry. E.g. 429 Too Many requests. :param retryable_rpc_error_messages: See :py:data:`DEFAULT_RETRYABLE_RPC_ERROR_MESSAGES`. :param method: JSON-RPC method name we called. :param params: Method args. """ # Cannot retry mining the block with the same timestamp if method == "evm_mine": if len(params) >= 1: return False if isinstance(exc, ValueError): # raise ValueError(response["error"]) # ValueError: {'message': 'Internal JSON-RPC error.', 'code': -32603} if len(exc.args) > 0: arg = exc.args[0] if type(arg) == dict: code = arg.get("code") message = arg.get("message", "") if code is None or type(code) != int: raise RuntimeError(f"Bad ValueError: {arg} - {exc}") if code in retryable_rpc_error_codes: return True if message in retryable_rpc_error_messages: return True return False if isinstance(exc, ProbablyNodeHasNoBlock): return True if isinstance(exc, HTTPError): return exc.response.status_code in retryable_status_codes if isinstance(exc, retryable_exceptions): return True return False
[docs]def exception_retry_middleware( make_request: Callable[[RPCEndpoint, Any], RPCResponse], web3: "Web3", retryable_exceptions: Tuple[BaseException], retryable_status_codes: Collection[int], retryable_rpc_error_codes: Collection[int], retries: int = 10, sleep: float = 5.0, backoff: float = 1.6, ) -> Callable[[RPCEndpoint, Any], RPCResponse]: """ Creates middleware that retries failed HTTP requests. Is a default middleware for HTTPProvider. See :py:func:`http_retry_request_with_sleep_middleware` for usage. """ def middleware(method: RPCEndpoint, params: Any) -> Optional[RPCResponse]: nonlocal sleep current_sleep = sleep # Check if the RPC method is whitelisted for multiple retries if check_if_retry_on_failure(method): # Try to recover from any JSON-RPC node error, sleep and try again for i in range(retries): try: return make_request(method, params) # https://github.com/python/mypy/issues/5349 except Exception as e: # type: ignore if is_retryable_http_exception( e, retryable_rpc_error_codes=retryable_rpc_error_codes, retryable_status_codes=retryable_status_codes, retryable_exceptions=retryable_exceptions, ): if i < retries - 1: logger.warning("Encountered JSON-RPC retryable error %s when calling method %s, retrying in %f seconds, retry #%d", e, method, current_sleep, i) time.sleep(current_sleep) current_sleep *= backoff continue else: raise # Out of retries raise # Not retryable exception return None else: try: return make_request(method, params) except Exception as e: # Be verbose so that we know our whitelist is missing methods raise RuntimeError(f"JSON-RPC failed for non-whitelisted method {method}: {e}") from e return middleware
[docs]def http_retry_request_with_sleep_middleware( make_request: Callable[[RPCEndpoint, Any], Any], web3: "Web3", ) -> Callable[[RPCEndpoint, Any], Any]: """A HTTP retry middleware with sleep and backoff. If you want to customise timeouts, supported exceptions and such you can directly create your own middleware using :py:func:`exception_retry_middleware`. Usage: .. code-block:: web3.middleware_onion.clear() web3.middleware_onion.inject(http_retry_request_with_sleep_middleware, layer=0) :param make_request: Part of middleware call signature :param web3: Part of middleware call signature :return: Web3.py middleware """ return exception_retry_middleware( make_request, web3, retryable_exceptions=DEFAULT_RETRYABLE_EXCEPTIONS, retryable_status_codes=DEFAULT_RETRYABLE_HTTP_STATUS_CODES, retryable_rpc_error_codes=DEFAULT_RETRYABLE_RPC_ERROR_CODES, )
[docs]def raise_on_revert_middleware( make_request: Callable[[RPCEndpoint, Any], Any], web3: "Web3", ) -> Callable[[RPCEndpoint, Any], Any]: """Automatically show the transaction revert reason in Python traceback. - Designed to make writing unit tests more productive - Transaction will already revert in `eth_estimateGas` call unless you have manually set the gas limit for your transaction - If a transaction fails, this middleware display its revert reason in Python exception message - Tested with Anvil testing backend - May interfere with :py:func:`http_retry_request_with_sleep_middleware`, others, so don't use in production .. code-block:: from eth_defi.middleware import revert_reason_middleware # Fix the web3.py stock gas estimate middlware with smarted one web3.middleware_onion.replace("gas_estimate", revert_reason_aware_buffered_gas_estimate_middleware) # Now you check the revert reason as the following """ def middleware(method: RPCEndpoint, params: Any) -> RPCResponse: if method == "eth_sendTransaction": transaction = params[0] if "gas" not in transaction: transaction = assoc( transaction, "gas", hex(get_buffered_gas_estimate(web3, transaction)), ) return make_request(method, [transaction]) return make_request(method, params) return middleware
# # # # Monkey patch # https://github.com/ethereum/web3.py/issues/2936 # # from eth_utils.toolz import compose from web3._utils.transactions import fill_nonce, fill_transaction_defaults from web3.middleware.signing import format_transaction, gen_normalized_accounts from web3.types import Middleware, RPCEndpoint, RPCResponse
[docs]def construct_sign_and_send_raw_middleware_anvil( private_key_or_account, ) -> Middleware: """Capture transactions sign and send as raw transactions .. note :: This is web3.py middleware that has been fixed for Anvil/other JSON-RPC compatibility. Keyword arguments: private_key_or_account -- A single private key or a tuple, list or set of private keys. Keys can be any of the following formats: - An eth_account.LocalAccount object - An eth_keys.PrivateKey object - A raw private key as a hex string or byte string """ accounts = gen_normalized_accounts(private_key_or_account) def sign_and_send_raw_middleware(make_request: Callable[[RPCEndpoint, Any], Any], w3: "Web3") -> Callable[[RPCEndpoint, Any], RPCResponse]: format_and_fill_tx = compose(format_transaction, fill_transaction_defaults(w3), fill_nonce(w3)) def middleware(method: RPCEndpoint, params: Any) -> RPCResponse: if method != "eth_sendTransaction": return make_request(method, params) else: transaction = format_and_fill_tx(params[0]) if "from" not in transaction: return make_request(method, params) elif transaction.get("from") not in accounts: return make_request(method, params) account = accounts[transaction["from"]] raw_tx = account.sign_transaction(transaction).rawTransaction return make_request(RPCEndpoint("eth_sendRawTransaction"), [raw_tx.hex()]) return middleware return sign_and_send_raw_middleware
[docs]def static_call_cache_middleware( make_request: Callable[[RPCEndpoint, Any], Any], web3: "Web3", ) -> Callable[[RPCEndpoint, Any], Any]: """Cache JSON-RPC call values that never change. The cache is web3 instance itself, to allow sharing the cache between different JSON-RPC providers. """ def middleware(method: RPCEndpoint, params: Any) -> RPCResponse: cache = getattr(web3, "static_call_cache", {}) if method in STATIC_CALL_LIST: cached = cache.get(method) if cached: return cached resp = make_request(method, params) cache[method] = resp web3.static_call_cache = cache return resp return middleware