"""Configuring and managing multiple JSON-RPC provider connections.
See :ref:`multi rpc` tutorial for details.
"""
import logging
from typing import List, Optional, Any, Dict
import requests
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util import parse_url, Url, Retry
from web3 import Web3, HTTPProvider
from eth_defi.chain import install_chain_middleware
from eth_defi.event_reader.fast_json_rpc import patch_provider, patch_web3
from eth_defi.middleware import static_call_cache_middleware
from eth_defi.provider.anvil import is_anvil
from eth_defi.provider.broken_provider import set_block_tip_latency
from eth_defi.provider.fallback import FallbackProvider
from eth_defi.provider.mev_blocker import MEVBlockerProvider
from eth_defi.provider.named import NamedProvider, get_provider_name
logger = logging.getLogger(__name__)
[docs]class MultiProviderConfigurationError(Exception):
"""Could not parse URL soup for configuring web3"""
[docs]class MultiProviderWeb3(Web3):
"""A web3 instance that knows about multiple RPC endpoints it is using.
- Can use :py:class:`eth_defi.provider.mev_blocker.MEVBlockerProvider`
for making transactions to prevent frontrunning
- There might be several call (read) providers for reading on-chain data
with fallbacks using :py:class:`eth_defi.provider.fallback.FallbackProvider`
See
- :ref:`multi rpc` for details
- :py:func:`create_multi_provider_web3`
"""
[docs] def get_active_transact_provider(self) -> NamedProvider:
"""Get active transact provider.
Can be a call provider if not configured.
"""
provider = self.provider
if isinstance(provider, MEVBlockerProvider):
return provider.transact_provider
return self.get_active_call_provider()
[docs] def get_active_call_provider(self) -> NamedProvider:
"""Get active call provider."""
provider = self.provider
if isinstance(provider, MEVBlockerProvider):
fallback_provider = provider.call_provider
assert isinstance(fallback_provider, FallbackProvider), f"Got: {fallback_provider}"
return fallback_provider.get_active_provider()
else:
assert isinstance(provider, FallbackProvider), f"Got: {provider}"
return provider.get_active_provider()
[docs] def get_fallback_provider(self) -> FallbackProvider:
"""Get the fallback provider multiplexer."""
provider = self.provider
if isinstance(provider, MEVBlockerProvider):
fallback_provider = provider.call_provider
assert isinstance(fallback_provider, FallbackProvider), f"Got: {fallback_provider}"
return fallback_provider
else:
assert isinstance(provider, FallbackProvider), f"Got: {provider}"
return provider
[docs] def switch_to_next_call_provider(self):
"""Recycles to the next call provider (if available)."""
self.get_fallback_provider().switch_provider()
[docs] def get_api_call_counts(self) -> Dict[str, int]:
"""How many times different APIs where called.
:return:
RPC endpoint name, call count dict
"""
return self.get_fallback_provider().get_total_api_call_counts()
[docs]def create_multi_provider_web3(
configuration_line: str,
fallback_sleep=5.0,
fallback_backoff=1.25,
request_kwargs: Optional[Any] = None,
session: Optional[Any] = None,
) -> MultiProviderWeb3:
"""Create a Web3 instance with multi-provider support.
Create a complex Web3 connection manager that
- Supports fail-overs to different providers
- Can have a special execution endpoint
for MEV protection
- HTTP providers are monkey-patched for faster uJSON reading
- HTTP providers have middleware cleared and chain middleware installed
The configuration line is a whitespace separated list of URLs (spaces, newlines, etc.)
using mini configuration language.
- If any of the protocols have `mev+` prefix like `mev+https` then this
endpoint is used for the execution.
Example:
.. code-block:: python
config = "mev+https://rpc.mevblocker.io https://polygon-rpc.com https://bsc-dataseed2.bnbchain.org"
web3 = create_multi_provider_web3(config)
assert get_provider_name(web3.get_fallback_provider()) == "polygon-rpc.com"
assert len(web3.get_fallback_provider().providers) == 2
assert get_provider_name(web3.get_active_transact_provider()) == "rpc.mevblocker.io"
See
- :ref:`multi rpc` for details
:param configuration_line:
Configuration line from an environment variable, config file or similar.
:param fallback_sleep:
Seconds between JSON-RPC call retries.
:param fallback_backoff:
Sleep increase multiplier.
:param request_kwargs:
Passed to HTTPProvider, arguments for :py:mod:`requests` library when doing HTTP requests.
See :py:class:`web3.HTTPProvider` for details.
Example: ``request_kwargs={"timeout": 10.0}``
:param session:
Use specific HTTP 1.1 session with :py:mod:`requests`.
If not given create a default session manager with retry logic.
:return:
Configured Web3 instance with multiple providers
"""
assert type(configuration_line) == str
items = configuration_line.split()
urls: List[Url] = []
for parsable in items:
parsable = parsable.strip()
try:
url = parse_url(parsable)
except Exception as e:
raise MultiProviderConfigurationError(f"Could not parse JSON-RPC configuration URL: {parsable}")
if not url.scheme:
raise MultiProviderConfigurationError(f"Bad URL: {parsable}")
if url in urls:
raise MultiProviderConfigurationError(f"Entry appears twice: {url}")
urls.append(url)
if len(urls) == 0:
raise MultiProviderConfigurationError(f"No configured endpoints")
transact_endpoints = [url.url.replace("mev+", "") for url in urls if url.scheme.startswith("mev+")]
call_endpoints = [url.url for url in urls if not url.scheme.startswith("mev+")]
if len(transact_endpoints) > 1:
raise MultiProviderConfigurationError(f"Only one execution endpoint can be specified, got {transact_endpoints}")
if len(call_endpoints) < 0:
raise MultiProviderConfigurationError(f"At least one call endpoint must be specified, configuration was {configuration_line}")
if session is None:
# https://stackoverflow.com/a/47475019/315168
# TODO: Make these parameters configurable
session = requests.Session()
retry = Retry(connect=3, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
call_providers = [HTTPProvider(url, request_kwargs=request_kwargs, session=session) for url in call_endpoints]
# Do uJSON patching
for p in call_providers:
_fix_provider(p)
fallback_provider = FallbackProvider(call_providers, sleep=fallback_sleep, backoff=fallback_backoff)
transact_provider = None
if len(transact_endpoints) > 0:
transact_endpoint = transact_endpoints[0]
transact_provider = HTTPProvider(transact_endpoint, request_kwargs=request_kwargs, session=session)
_fix_provider(transact_provider)
provider = MEVBlockerProvider(
call_provider=fallback_provider,
transact_provider=transact_provider,
)
else:
provider = fallback_provider
logger.info(
"Configuring MultiProviderWeb3. Call providers: %s, transact providers %s",
[get_provider_name(c) for c in call_providers],
get_provider_name(transact_provider) if transact_provider else "-",
)
web3 = MultiProviderWeb3(provider)
patch_web3(web3)
web3.middleware_onion.clear()
web3.middleware_onion.inject(static_call_cache_middleware, layer=0)
# Note that this triggers the first RPC call here
install_chain_middleware(web3)
if is_anvil(web3):
# When running against local testing,
# we need to disable block tip latency hacks
set_block_tip_latency(web3, 0)
return web3
def _fix_provider(provider: HTTPProvider):
provider.middlewares.clear()
patch_provider(provider)