-
Notifications
You must be signed in to change notification settings - Fork 33
/
Copy pathoracle_module.py
142 lines (119 loc) · 5.88 KB
/
oracle_module.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import logging
import time
import traceback
from abc import abstractmethod, ABC
from dataclasses import asdict
from enum import Enum
from requests.exceptions import ConnectionError as RequestsConnectionError
from timeout_decorator import timeout, TimeoutError as DecoratorTimeoutError
from web3.exceptions import Web3Exception
from src.metrics.healthcheck_server import pulse
from src.metrics.prometheus.basic import ORACLE_BLOCK_NUMBER, ORACLE_SLOT_NUMBER
from src.modules.submodules.exceptions import IsNotMemberException, IncompatibleOracleVersion
from src.providers.http_provider import NotOkResponse
from src.providers.ipfs import IPFSError
from src.providers.keys.client import KeysOutdatedException
from src.utils.cache import clear_global_cache
from src.web3py.extensions.lido_validators import CountOfKeysDiffersException
from src.utils.blockstamp import build_blockstamp
from src.utils.slot import NoSlotsAvailable, SlotNotFinalized, InconsistentData
from src.web3py.types import Web3
from web3_multi_provider import NoActiveProviderError
from src import variables
from src.types import SlotNumber, BlockStamp, BlockRoot
logger = logging.getLogger(__name__)
class ModuleExecuteDelay(Enum):
"""Signals from execute_module method"""
NEXT_SLOT = 0
NEXT_FINALIZED_EPOCH = 1
class BaseModule(ABC):
"""
Base skeleton for Oracle modules.
Goals:
- Catch errors and log them.
- Raise exceptions that could not be proceeded automatically.
- Check Module didn't stick inside cycle forever.
"""
# This is reference mark for long sleep. Sleep until new finalized slot found.
_slot_threshold = SlotNumber(0)
def __init__(self, w3: Web3):
self.w3 = w3
def run_as_daemon(self):
logger.info({'msg': 'Run module as daemon.'})
while True:
logger.debug({'msg': 'Startup new cycle.'})
self.cycle_handler()
@timeout(variables.MAX_CYCLE_LIFETIME_IN_SECONDS)
def cycle_handler(self):
blockstamp = self._receive_last_finalized_slot()
if blockstamp.slot_number > self._slot_threshold:
if self.w3.lido_contracts.has_contract_address_changed():
clear_global_cache()
self.refresh_contracts()
result = self.run_cycle(blockstamp)
if result is ModuleExecuteDelay.NEXT_FINALIZED_EPOCH:
self._slot_threshold = blockstamp.slot_number
else:
logger.info({
'msg': 'Skipping the report. Wait for new finalized slot.',
'slot_threshold': self._slot_threshold,
})
logger.info({'msg': f'Cycle end. Sleep for {variables.CYCLE_SLEEP_IN_SECONDS} seconds.'})
time.sleep(variables.CYCLE_SLEEP_IN_SECONDS)
def _receive_last_finalized_slot(self) -> BlockStamp:
block_root = BlockRoot(self.w3.cc.get_block_root('finalized').root)
block_details = self.w3.cc.get_block_details(block_root)
bs = build_blockstamp(block_details)
logger.info({'msg': 'Fetch last finalized BlockStamp.', 'value': asdict(bs)})
ORACLE_SLOT_NUMBER.labels('finalized').set(bs.slot_number)
ORACLE_BLOCK_NUMBER.labels('finalized').set(bs.block_number)
return bs
def run_cycle(self, blockstamp: BlockStamp) -> ModuleExecuteDelay:
# pylint: disable=too-many-branches
logger.info({'msg': 'Execute module.', 'value': blockstamp})
try:
result = self.execute_module(blockstamp)
except IsNotMemberException as exception:
logger.error({'msg': 'Provided account is not part of Oracle`s committee.'})
raise exception
except IncompatibleOracleVersion as exception:
logger.error({'msg': 'Incompatible Contract version. Please update Oracle Daemon.'})
raise exception
except DecoratorTimeoutError as exception:
logger.error({'msg': 'Oracle module do not respond.', 'error': str(exception)})
except NoActiveProviderError as error:
logger.error({'msg': ''.join(traceback.format_exception(error))})
except RequestsConnectionError as error:
logger.error({'msg': 'Connection error.', 'error': str(error)})
except NotOkResponse as error:
logger.error({'msg': ''.join(traceback.format_exception(error))})
except (NoSlotsAvailable, SlotNotFinalized, InconsistentData) as error:
logger.error({'msg': 'Inconsistent response from consensus layer node.', 'error': str(error)})
except KeysOutdatedException as error:
logger.error({'msg': 'Keys API service returns outdated data.', 'error': str(error)})
except CountOfKeysDiffersException as error:
logger.error({'msg': 'Keys API service returned incorrect number of keys.', 'error': str(error)})
except Web3Exception as error:
logger.error({'msg': 'Web3py exception.', 'error': str(error)})
except IPFSError as error:
logger.error({'msg': 'IPFS provider error.', 'error': str(error)})
except ValueError as error:
logger.error({'msg': 'Unexpected error.', 'error': str(error)})
else:
# if there are no exceptions, then pulse
pulse()
return result
return ModuleExecuteDelay.NEXT_SLOT
@abstractmethod
def execute_module(self, last_finalized_blockstamp: BlockStamp) -> ModuleExecuteDelay:
"""
Implement module business logic here.
Return
ModuleExecuteDelay.NEXT_FINALIZED_EPOCH - to sleep until new finalized epoch
ModuleExecuteDelay.NEXT_SLOT - to sleep for a slot
"""
raise NotImplementedError('Module should implement this method.') # pragma: no cover
@abstractmethod
def refresh_contracts(self):
"""This method called if contracts addresses were changed"""
raise NotImplementedError('Module should implement this method.') # pragma: no cover