Source code for eth_defi.uniswap_v3.utils

"""Uniswap v3 helper functions."""
import math
from typing import Tuple

from web3 import Web3
from eth_typing import HexAddress

from eth_defi.uniswap_v3.constants import (
    DEFAULT_TICK_SPACINGS,
    MAX_TICK,
    MIN_TICK,
    UNISWAP_V3_SUBGRAPH_URL,
)


[docs]def encode_sqrt_ratio_x96(*, amount0: int, amount1: int) -> int: """Returns the sqrt ratio as a Q64.96 corresponding to a given ratio of amount1 and amount0 :param int amount0: the denominator amount, i.e amount of token0 :param int amount1: the numerator amount, i.e. amount of token1 :return: the sqrt ratio `Lifted from StakeWise Oracle (AGPL license) <https://github.com/stakewise/oracle/blob/master/oracle/oracle/distributor/uniswap_v3.py#L547>`__. """ numerator: int = amount1 << 192 denominator: int = amount0 ratio_x192: int = numerator // denominator return int(math.sqrt(ratio_x192))
[docs]def encode_path( path: list[HexAddress], fees: list, exact_output: bool = False, ) -> bytes: """Encode the routing path to be suitable to use with Quoter and SwapRouter. For example if we would like to route the swap from token1 -> token3 through 2 pools: * pool1: token1/token2 * pool2: token2/token3 then encoded path would have this format: `token1 - pool1's fee - token2 - pool2's - token3`, in which each token address length is 20 bytes and fee length is 3 bytes `Read more <https://github.com/Uniswap/v3-periphery/blob/22a7ead071fff53f00d9ddc13434f285f4ed5c7d/contracts/libraries/Path.sol>`__. :param path: List of token addresses how to route the trade :param fees: List of trading fees of the pools in the route :param exact_output: Whether the encoded path be used for exactOutput quote or swap """ assert len(fees) == len(path) - 1 if exact_output: path.reverse() fees.reverse() encoded = b"" for index, token in enumerate(path): encoded += bytes.fromhex(token[2:]) if token != path[-1]: encoded += int.to_bytes(fees[index], 3, "big") return encoded
[docs]def decode_path(full_path_encoded: bytes) -> list: """Decodes the path. A bit tricky. Thanks to https://degencode.substack.com/p/project-uniswapv3-mempool-watcher :param full_path_encoded: Encoded path as returned from router.decode_function_input (bytes) :returns: fully decoded path array including addresses and fees """ assert type(full_path_encoded == bytes), "encoded path must be provided as bytes" path_pos = 0 full_path_decoded = [] # read alternating 20 and 3 byte chunks from the encoded path, # store each address (hex) and fee (int) byte_length = 20 while True: # stop at the end if path_pos == len(full_path_encoded): break elif byte_length == 20 and len(full_path_encoded) >= path_pos + byte_length: address = full_path_encoded[path_pos : path_pos + byte_length].hex() full_path_decoded.append(Web3.to_checksum_address(address)) elif byte_length == 3 and len(full_path_encoded) >= path_pos + byte_length: fee = int( full_path_encoded[path_pos : path_pos + byte_length].hex(), 16, ) full_path_decoded.append(fee) else: raise IndexError(f"Bad path: {full_path_encoded}") path_pos += byte_length byte_length = 3 if byte_length == 20 else 20 return full_path_decoded
[docs]def get_min_tick(fee: int) -> int: """Returns min tick for given fee. Adapted from https://github.com/Uniswap/v3-periphery/blob/v1.0.0/test/shared/ticks.ts """ tick_spacing: int = DEFAULT_TICK_SPACINGS[fee] return math.ceil(MIN_TICK / tick_spacing) * tick_spacing
[docs]def get_max_tick(fee: int) -> int: """Returns max tick for given fee. Adapted from https://github.com/Uniswap/v3-periphery/blob/v1.0.0/test/shared/ticks.ts """ tick_spacing: int = DEFAULT_TICK_SPACINGS[fee] return math.floor(MAX_TICK / tick_spacing) * tick_spacing
[docs]def get_default_tick_range(fee: int) -> Tuple[int, int]: """Returns min and max tick for a given fee, this is used by default if the pool owner doesn't want to apply concentrated liquidity initially. """ min_tick = get_min_tick(fee) max_tick = get_max_tick(fee) return min_tick, max_tick
[docs]def tick_to_price(tick): """Returns price corresponding to a tick""" return 1.0001**tick
[docs]def tick_to_sqrt_price(tick): """Returns square root price corresponding to a tick""" return tick_to_price(tick / 2)
[docs]def get_token0_amount_in_range(liquidity, sp, sb): """Returns token0 (base token) amount in a liquidity range This is derived formula based on: https://atiselsts.github.io/pdfs/uniswap-v3-liquidity-math.pdf :param liquidity: current virtual liquidity :param sp: square root current price :param sb: square root upper price """ return liquidity * (sb - sp) / (sp * sb)
[docs]def get_token1_amount_in_range(liquidity, sp, sa): """Returns token1 (quote token) amount in a liquidity range This is derived formula based on: https://atiselsts.github.io/pdfs/uniswap-v3-liquidity-math.pdf :param liquidity: current virtual liquidity :param sp: square root current price :param sb: square root lower price """ return liquidity * (sp - sa)
[docs]def run_graphql_query(query: str, *, variables: dict = {}, api_url=UNISWAP_V3_SUBGRAPH_URL) -> dict: """Run query on Uniswap v3 subgraph""" from gql import Client, gql from gql.transport.requests import RequestsHTTPTransport transport = RequestsHTTPTransport(url=api_url, verify=True, retries=3) graphql_client = Client(transport=transport, fetch_schema_from_transport=True) return graphql_client.execute(gql(query), variable_values=variables)
[docs]def get_nearest_usable_tick(tick: int, fee: int): min_tick, max_tick = get_default_tick_range(fee) assert min_tick <= tick <= max_tick, "Tick out of bound" tick_spacing = DEFAULT_TICK_SPACINGS[fee] rounded = round(tick / tick_spacing) * tick_spacing if rounded < min_tick: return rounded + tick_spacing elif rounded > max_tick: return rounded - tick_spacing else: return rounded