Source code for eth_defi.event_reader.web3worker
"""Web3 thread pool worker helpers."""
import logging
import threading
from typing import Optional
import futureproof
from web3 import Web3
from eth_defi.event_reader.logresult import LogContext
from eth_defi.event_reader.web3factory import Web3Factory
logger = logging.getLogger(__name__)
_thread_local_storage = threading.local()
[docs]def get_worker_web3() -> Web3:
"""Get the Web3 connection for the worker.
The connection was initialized when the worker thread was created.
"""
return _thread_local_storage.web3
[docs]def create_thread_pool_executor(factory: Web3Factory, context: Optional[LogContext] = None, max_workers=16) -> futureproof.ThreadPoolExecutor:
"""Create a thread pool executor.
All pool members have the thread locals initialized at start,
so that there is Web3 connection available.
:param factory:
The factory that provides web3 connection
for each threaad after a thread has been launched.
:param context:
Event reader context.
If you want to pass something extra for the event reader.
:param max_workers:
How many threads are allocated for futureproof pool.
"""
def init():
_thread_local_storage.web3 = factory(context)
logger.debug("Worker thread %d initialized", threading.get_ident())
executor = futureproof.ThreadPoolExecutor(max_workers=max_workers, initializer=init)
return executor