"""Price oracle implementation for Uniswap v2 pools."""
import datetime
from collections import Counter
from dataclasses import dataclass
from web3 import Web3
from eth_defi.abi import get_contract
from eth_defi.event_reader.conversion import convert_int256_bytes_to_int, decode_data
from eth_defi.event_reader.filter import Filter
from eth_defi.event_reader.logresult import LogContext
from eth_defi.event_reader.reader import extract_timestamps_json_rpc, read_events
from eth_defi.price_oracle.oracle import PriceEntry, PriceOracle, PriceSource
from eth_defi.uniswap_v2.pair import PairDetails, fetch_pair_details
[docs]@dataclass
class UniswapV2PriceOracleContext(LogContext):
"""Hold data about tokens in in the pool"""
pair: PairDetails
reverse_token_order: bool
[docs]def convert_sync_log_result_to_price_entry(log: dict) -> PriceEntry:
"""Create a price entry based on Sync eth_getLogs result.
Called by :py:func:`update_price_oracle_with_sync_events_single_thread`.
"""
context: UniswapV2PriceOracleContext = log["context"]
# Check our JSON-RPC has not served us something bad
assert log["address"] == context.pair.address.lower(), f"Got wrong source address for Sync event. Expected pair contract {context.pair.address}, got {log['address']}"
# {'address': '0xa6db9e0061cfb22da5755621bb363cdfe06057da',
# 'topics': ['0x1c411e9a96e071241c2f21f7726b17ae89e3cab4c78be50e062b03a9fffbbad1'],
# 'data': '0x000000000000000000000000000000000000000001b298e4a3c23039f6762b3b00000000000000000000000000000000000000000000000fdf9c1b5944aaa9a2',
# 'blockNumber': '0xd59f80', 'transactionHash': '0x45cac9ba3f7a3bc93efecfa56c1aadffb31b5099842070d054e7b6111f1ac9bc', 'transactionIndex': '0x1', 'blockHash': '0x83447948658a5ae2dd1295cce35a85b61623fb9611578bcca65c4c918cbe3985', 'logIndex': '0x3', 'removed': False, 'context': None, 'event': <class 'web3._utils.datatypes.Sync'>, 'timestamp': 1641086320}
timestamp = datetime.datetime.utcfromtimestamp(log["timestamp"])
# Chop data blob to byte32 entries
data_entries = decode_data(log["data"])
reserve0 = convert_int256_bytes_to_int(data_entries[0])
reserve1 = convert_int256_bytes_to_int(data_entries[1])
assert reserve0 > 0
assert reserve1 > 0
price = context.pair.convert_price_to_human(
reserve0,
reserve1,
context.reverse_token_order,
)
return PriceEntry(
timestamp=timestamp,
price=price,
volume=None, # For volume you would also need to get matching Swap() event
block_number=log["blockNumber"],
source=PriceSource.uniswap_v2_like_pool_sync_event,
pool_contract_address=log["address"],
block_hash=log["blockHash"],
tx_hash=log["transactionHash"],
)
#
# def update_price_oracle_with_sync_events(
# oracle: PriceOracle,
# executor: futureproof.ThreadPoolExecutor,
# web3_factory: Web3Factory,
# pair_contract_address: str,
# start_block: int,
# end_block: int,
# thread_pool_executor: Optional[futureproof.ThreadPoolExecutor],
# ):
# """Feed price oracle data for a given block range.
#
# - Uses optimised parallel reading thread pool implmentation
#
# - Uses fast multithreaded pool for the event fetch
# """
#
# web3 = Web3Factory(None)
#
# Pair = get_contract(web3, "UniswapV2Pair.json")
#
# events = [
# Pair.events.Sync
# ]
#
# signatures = Pair.events.Sync.build_filter().topics
# assert len(signatures) == 1
#
# filter = Filter(
# contract_address=pair_contract_address,
# bloom=None,
# topics={
# signatures[0]: Pair.events.Sync,
# }
# )
#
# for log_result in read_events_concurrent(
# web3_factory,
# start_block,
# end_block,
# [Pair.events.Sync],
# notify=None,
# chunk_size=100,
# filter=filter,
# context=None,
# ):
# import ipdb ; ipdb.set_trace()
[docs]def update_price_oracle_with_sync_events_single_thread(
oracle: PriceOracle,
web3: Web3,
pair_contract_address: str,
start_block: int,
end_block: int,
reverse_token_order=False,
):
"""Feed price oracle data for a given block range.
A slow single threaded implementation - suitable for testing.
Example:
.. code-block: python
# Randomly chosen block range.
start_block = 14_000_000
end_block = 14_000_100
pair_details = fetch_pair_details(web3, bnb_busd_address)
assert pair_details.token0.symbol == "WBNB"
assert pair_details.token1.symbol == "BUSD"
oracle = PriceOracle(
time_weighted_average_price,
max_age=PriceOracle.ANY_AGE, # We are dealing with historical data
min_duration=datetime.timedelta(minutes=1),
)
update_price_oracle_with_sync_events_single_thread(
oracle,
web3,
bnb_busd_address,
start_block,
end_block
)
assert oracle.calculate_price() == pytest.approx(Decimal('523.8243566658033237353702655'))
:param oracle:
Price oracle to update
:param web3:
Web3 connection we use to fetch Sync event data from JSON-RPC node
:param start_block:
First block to include data for
:param end_block:
Last block to include data for (inclusive)
:param reverse_token_order:
If pair token0 is the quote token to calculate the price.
"""
assert pair_contract_address
Pair = get_contract(web3, "sushi/UniswapV2Pair.json")
signatures = Pair.events.Sync.build_filter().topics
assert len(signatures) == 1
filter = Filter(
contract_address=pair_contract_address,
bloom=None,
topics={
signatures[0]: Pair.events.Sync,
},
)
pool_details = fetch_pair_details(web3, pair_contract_address)
# Feed oracle with event data from JSON-RPC node
for log_result in read_events(
web3,
start_block,
end_block,
[Pair.events.Sync],
notify=None,
chunk_size=100,
filter=filter,
context=UniswapV2PriceOracleContext(pool_details, reverse_token_order),
):
entry = convert_sync_log_result_to_price_entry(log_result)
oracle.add_price_entry(entry)
[docs]def update_live_price_feed(
oracle: PriceOracle,
web3: Web3,
pair_contract_address: str,
reverse_token_order=False,
lookback_block_count: int = 5,
) -> Counter:
"""Fetch live price of Uniswap v2 pool by listening to Sync event.
We use HTTP polling method, as HTTP polling is supported by free nodes.
.. warning::
We do not have bullet-proof logic to deal with minor chain reorgs.
Some transactions can hop blocks and be rejected in later blocks,
and we do not deal with this.
This is a simple example implementation and may not suitable
for production usage.
:return:
Debug stats
"""
stats = Counter(
{
"created": 0,
"reorgs": 0,
"discarded": 0,
}
)
Pair = get_contract(web3, "sushi/UniswapV2Pair.json")
events = [Pair.events.Sync]
pair_details = fetch_pair_details(web3, pair_contract_address)
filter = Filter.create_filter(pair_contract_address, events)
current_block = web3.eth.block_number
start_block = current_block - lookback_block_count
end_block = current_block
# Feed oracle with event data from JSON-RPC node
for log_result in read_events(
web3,
start_block,
end_block,
[Pair.events.Sync],
notify=None,
chunk_size=100,
filter=filter,
context=UniswapV2PriceOracleContext(pair_details, reverse_token_order),
):
entry = convert_sync_log_result_to_price_entry(log_result)
hopped = oracle.add_price_entry_reorg_safe(entry)
if hopped:
stats["reorgs"] += 1
else:
stats["created"] += 1
# Get the last block timestamp
timestamps = extract_timestamps_json_rpc(web3, end_block, end_block)
unix_timestamp = next(iter(timestamps.values()))
last_timestamp = datetime.datetime.utcfromtimestamp(unix_timestamp)
oracle.update_last_refresh(end_block, last_timestamp)
# Clean old data
stats["discarded"] = oracle.truncate_buffer(last_timestamp)
return stats