-
Notifications
You must be signed in to change notification settings - Fork 34
/
Copy pathmain.py
175 lines (138 loc) · 5.87 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import sys
from typing import Iterator, cast
from packaging.version import Version
from prometheus_client import start_http_server
from web3.middleware import simple_cache_middleware
from src import variables
from src.metrics.healthcheck_server import start_pulse_server
from src.metrics.logging import logging
from src.metrics.prometheus.basic import ENV_VARIABLES_INFO, BUILD_INFO
from src.modules.accounting.accounting import Accounting
from src.modules.ejector.ejector import Ejector
from src.modules.checks.checks_module import ChecksModule
from src.modules.csm.csm import CSOracle
from src.providers.ipfs import GW3, IPFSProvider, MultiIPFSProvider, Pinata, PublicIPFS
from src.types import OracleModule
from src.utils.build import get_build_info
from src.utils.exception import IncompatibleException
from src.web3py.extensions import (
LidoContracts,
TransactionUtils,
ConsensusClientModule,
KeysAPIClientModule,
LidoValidatorsProvider,
FallbackProviderModule,
LazyCSM,
)
from src.web3py.middleware import metrics_collector
from src.web3py.types import Web3
from src.web3py.contract_tweak import tweak_w3_contracts
logger = logging.getLogger(__name__)
def main(module_name: OracleModule):
build_info = get_build_info()
logger.info({
'msg': 'Oracle startup.',
'variables': {
**build_info,
'module': module_name,
**variables.PUBLIC_ENV_VARS,
},
})
ENV_VARIABLES_INFO.info(variables.PUBLIC_ENV_VARS)
BUILD_INFO.info(build_info)
logger.info({'msg': f'Start healthcheck server for Docker container on port {variables.HEALTHCHECK_SERVER_PORT}'})
start_pulse_server()
logger.info({'msg': f'Start http server with prometheus metrics on port {variables.PROMETHEUS_PORT}'})
start_http_server(variables.PROMETHEUS_PORT)
logger.info({'msg': 'Initialize multi web3 provider.'})
web3 = Web3(FallbackProviderModule(
variables.EXECUTION_CLIENT_URI,
request_kwargs={'timeout': variables.HTTP_REQUEST_TIMEOUT_EXECUTION}
))
logger.info({'msg': 'Modify web3 with custom contract function call.'})
tweak_w3_contracts(web3)
logger.info({'msg': 'Initialize consensus client.'})
cc = ConsensusClientModule(variables.CONSENSUS_CLIENT_URI, web3)
logger.info({'msg': 'Initialize keys api client.'})
kac = KeysAPIClientModule(variables.KEYS_API_URI, web3)
logger.info({'msg': 'Initialize IPFS providers.'})
ipfs = MultiIPFSProvider(
ipfs_providers(),
retries=variables.HTTP_REQUEST_RETRY_COUNT_IPFS,
)
logger.info({'msg': 'Check configured providers.'})
if Version(kac.get_status().appVersion) < Version('1.5.0'):
raise IncompatibleException('Incompatible KAPI version. Required >= 1.5.0.')
check_providers_chain_ids(web3, cc, kac)
web3.attach_modules({
'lido_contracts': LidoContracts,
'lido_validators': LidoValidatorsProvider,
'transaction': TransactionUtils,
'csm': LazyCSM,
'cc': lambda: cc, # type: ignore[dict-item]
'kac': lambda: kac, # type: ignore[dict-item]
'ipfs': lambda: ipfs, # type: ignore[dict-item]
})
logger.info({'msg': 'Add metrics middleware for ETH1 requests.'})
web3.middleware_onion.add(metrics_collector)
web3.middleware_onion.add(simple_cache_middleware)
logger.info({'msg': 'Sanity checks.'})
instance: Accounting | Ejector | CSOracle
if module_name == OracleModule.ACCOUNTING:
logger.info({'msg': 'Initialize Accounting module.'})
instance = Accounting(web3)
elif module_name == OracleModule.EJECTOR:
logger.info({'msg': 'Initialize Ejector module.'})
instance = Ejector(web3)
elif module_name == OracleModule.CSM:
logger.info({'msg': 'Initialize CSM performance oracle module.'})
instance = CSOracle(web3)
else:
raise ValueError(f'Unexpected arg: {module_name=}.')
instance.check_contract_configs()
if variables.DAEMON:
instance.run_as_daemon()
else:
instance.cycle_handler()
def check():
logger.info({'msg': 'Check oracle is ready to work in the current environment.'})
return ChecksModule().execute_module()
def check_providers_chain_ids(web3: Web3, cc: ConsensusClientModule, kac: KeysAPIClientModule):
keys_api_chain_id = kac.check_providers_consistency()
consensus_chain_id = cc.check_providers_consistency()
execution_chain_id = cast(FallbackProviderModule, web3.provider).check_providers_consistency()
if execution_chain_id == consensus_chain_id == keys_api_chain_id:
return
raise IncompatibleException(
'Different chain ids detected:\n'
f'Execution chain id: {execution_chain_id}\n'
f'Consensus chain id: {consensus_chain_id}\n'
f'Keys API chain id: {keys_api_chain_id}\n'
)
def ipfs_providers() -> Iterator[IPFSProvider]:
if variables.GW3_ACCESS_KEY and variables.GW3_SECRET_KEY:
yield GW3(
variables.GW3_ACCESS_KEY,
variables.GW3_SECRET_KEY,
timeout=variables.HTTP_REQUEST_TIMEOUT_IPFS,
)
if variables.PINATA_JWT:
yield Pinata(
variables.PINATA_JWT,
timeout=variables.HTTP_REQUEST_TIMEOUT_IPFS,
)
yield PublicIPFS(timeout=variables.HTTP_REQUEST_TIMEOUT_IPFS)
if __name__ == '__main__':
module_name_arg = sys.argv[-1]
if module_name_arg not in iter(OracleModule):
msg = f'Last arg should be one of {[str(item) for item in OracleModule]}, received {module_name_arg}.'
logger.error({'msg': msg})
raise ValueError(msg)
module = OracleModule(module_name_arg)
if module == OracleModule.CHECK:
errors = variables.check_uri_required_variables()
variables.raise_from_errors(errors)
sys.exit(check())
errors = variables.check_all_required_variables()
variables.raise_from_errors(errors)
main(module)