Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
0f45ec5
feat: separate performance collection and distribution
vgorkavenko Oct 17, 2025
962a076
Merge remote-tracking branch 'origin/csm-next' into feat/performance-…
vgorkavenko Oct 21, 2025
453effb
fix: lock
vgorkavenko Oct 21, 2025
485fe91
fix: black
vgorkavenko Oct 21, 2025
d9e5888
Merge remote-tracking branch 'origin/csm-next' into feat/performance-…
vgorkavenko Oct 22, 2025
8d86ca7
fix: already processed epochs
vgorkavenko Oct 22, 2025
9e2c951
feat: ChainConverter
vgorkavenko Oct 23, 2025
6e95c01
refactor: types
vgorkavenko Oct 27, 2025
46decb1
feat: better logging
vgorkavenko Oct 27, 2025
39d50d2
feat: additional validation
vgorkavenko Oct 27, 2025
d689979
refactor: db
vgorkavenko Oct 28, 2025
fa5ed3c
feat: `epochs_demand`
vgorkavenko Oct 29, 2025
a5d07f7
fix: `missing_epochs_in`
vgorkavenko Nov 4, 2025
1653943
fix: logic, logging
vgorkavenko Nov 4, 2025
aa871b3
fix: csm.execute_module
vgorkavenko Nov 4, 2025
2a70199
feat: add `_post` for http_provider
vgorkavenko Nov 4, 2025
db10267
fix: SafeBorder SafeBorder inheritance issue
vgorkavenko Nov 4, 2025
0925e5f
fix: remove TODOs
vgorkavenko Nov 4, 2025
84b2e47
feat: add `PERFORMANCE_COLLECTOR_DB_CONNECTION_TIMEOUT`
vgorkavenko Nov 4, 2025
b08865a
feat: use finalized epoch if no demands
vgorkavenko Nov 7, 2025
baf8c83
fix: `validate_state`
vgorkavenko Nov 10, 2025
e72cff2
WIP: `test_csm_module.py`
vgorkavenko Nov 10, 2025
1d2fb60
WIP: `test_csm_module.py`
vgorkavenko Nov 10, 2025
e9758b0
fix: linter
vgorkavenko Nov 10, 2025
f932b99
fix: `define_epochs_to_process_range`
vgorkavenko Nov 11, 2025
13330ea
fix: `define_epochs_to_process_range`. Simple AI tests
vgorkavenko Nov 11, 2025
84ff68e
fix: remove `DEFAULT_EPOCHS_STEP_TO_COLLECT`
vgorkavenko Nov 12, 2025
6adb0ae
fix: review
vgorkavenko Nov 14, 2025
cc080b3
fix: review
vgorkavenko Nov 28, 2025
1203448
fix: remove old tests
vgorkavenko Nov 28, 2025
3efa8b9
fix: imports in test
vgorkavenko Nov 28, 2025
90c790e
fix: test_csm_module
vgorkavenko Nov 28, 2025
d95584f
fix: linter
vgorkavenko Dec 2, 2025
f676fa0
fix: some tests
vgorkavenko Dec 2, 2025
9c1c902
fix: fork tests
vgorkavenko Dec 2, 2025
53aac86
fix: review
vgorkavenko Dec 11, 2025
a8732eb
fix: errors
vgorkavenko Dec 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
486 changes: 344 additions & 142 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ web3-multi-provider = { version = "^2.2.1", extras = ["metrics"] }
json-stream = "^2.3.2"
oz-merkle-tree = { git = "https://github.com/lidofinance/oz-merkle-tree" }
py-multiformats-cid = "^0.4.4"
flask = "^3.0.0"
waitress = "^3.0.2"

[tool.poetry.group.dev.dependencies]
base58 = "^2.1.1"
Expand Down
13 changes: 11 additions & 2 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from src.modules.checks.checks_module import ChecksModule
from src.modules.csm.csm import CSOracle
from src.modules.ejector.ejector import Ejector
from src.modules.performance_collector.performance_collector import PerformanceCollector
from src.providers.ipfs import IPFSProvider, Kubo, MultiIPFSProvider, Pinata, PublicIPFS
from src.types import OracleModule
from src.utils.build import get_build_info
Expand All @@ -27,6 +28,7 @@
LidoValidatorsProvider,
TransactionUtils,
)
from src.web3py.extensions.performance import PerformanceClientModule
from src.web3py.types import Web3

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -73,6 +75,9 @@ def main(module_name: OracleModule):
retries=variables.HTTP_REQUEST_RETRY_COUNT_IPFS,
)

logger.info({'msg': 'Initialize Performance Collector client.'})
performance = PerformanceClientModule(variables.PERFORMANCE_COLLECTOR_URI)

logger.info({'msg': 'Check configured providers.'})
if Version(kac.get_status().appVersion) < constants.ALLOWED_KAPI_VERSION:
raise IncompatibleException(f'Incompatible KAPI version. Required >= {constants.ALLOWED_KAPI_VERSION}.')
Expand All @@ -87,12 +92,13 @@ def main(module_name: OracleModule):
'cc': lambda: cc, # type: ignore[dict-item]
'kac': lambda: kac, # type: ignore[dict-item]
'ipfs': lambda: ipfs, # type: ignore[dict-item]
'performance': lambda: performance, # type: ignore[dict-item]
})

logger.info({'msg': 'Initialize prometheus metrics.'})
init_metrics()

instance: Accounting | Ejector | CSOracle
instance: Accounting | Ejector | CSOracle | PerformanceCollector
if module_name == OracleModule.ACCOUNTING:
logger.info({'msg': 'Initialize Accounting module.'})
instance = Accounting(web3)
Expand All @@ -102,10 +108,13 @@ def main(module_name: OracleModule):
elif module_name == OracleModule.CSM:
logger.info({'msg': 'Initialize CSM performance oracle module.'})
instance = CSOracle(web3)
elif module_name == OracleModule.PERFORMANCE_COLLECTOR:
instance = PerformanceCollector(web3)
else:
raise ValueError(f'Unexpected arg: {module_name=}.')

instance.check_contract_configs()
if module_name != OracleModule.PERFORMANCE_COLLECTOR:
instance.check_contract_configs()

if variables.DAEMON:
instance.run_as_daemon()
Expand Down
8 changes: 8 additions & 0 deletions src/metrics/prometheus/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ class Status(Enum):
buckets=requests_buckets,
)

PERFORMANCE_REQUESTS_DURATION = Histogram(
'performance_requests_duration',
'Duration of requests to Performance Collector API',
['endpoint', 'code', 'domain'],
namespace=PROMETHEUS_PREFIX,
buckets=requests_buckets,
)

KEYS_API_REQUESTS_DURATION = Histogram(
'keys_api_requests_duration',
'Duration of requests to Keys API',
Expand Down
165 changes: 74 additions & 91 deletions src/modules/csm/csm.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
CSM_CURRENT_FRAME_RANGE_R_EPOCH,
)
from src.metrics.prometheus.duration_meter import duration_meter
from src.modules.csm.checkpoint import FrameCheckpointProcessor, FrameCheckpointsIterator, MinStepIsNotReached
from src.modules.csm.distribution import Distribution, DistributionResult, StrikesValidator
from src.modules.csm.helpers.last_report import LastReport
from src.modules.csm.log import FramePerfLog
Expand All @@ -27,8 +26,11 @@
EpochNumber,
ReferenceBlockStamp,
SlotNumber,
ValidatorIndex,
)
from src.utils.cache import global_lru_cache as lru_cache
from src.utils.range import sequence
from src.utils.validator_state import is_active_validator
from src.utils.web3converter import Web3Converter
from src.web3py.extensions.lido_validators import NodeOperatorId
from src.web3py.types import Web3
Expand Down Expand Up @@ -69,24 +71,46 @@ def execute_module(self, last_finalized_blockstamp: BlockStamp) -> ModuleExecute
if not self._check_compatability(last_finalized_blockstamp):
return ModuleExecuteDelay.NEXT_FINALIZED_EPOCH

collected = self.collect_data(last_finalized_blockstamp)
if not collected:
logger.info(
{"msg": "Data required for the report is not fully collected yet. Waiting for the next finalized epoch"}
)
return ModuleExecuteDelay.NEXT_FINALIZED_EPOCH

report_blockstamp = self.get_blockstamp_for_report(last_finalized_blockstamp)
if not report_blockstamp:
return ModuleExecuteDelay.NEXT_FINALIZED_EPOCH

collected = self.collect_data(report_blockstamp)
if not collected:
return ModuleExecuteDelay.NEXT_FINALIZED_EPOCH

self.process_report(report_blockstamp)
return ModuleExecuteDelay.NEXT_SLOT

@duration_meter()
def collect_data(self, blockstamp: ReferenceBlockStamp) -> bool:
logger.info({"msg": "Collecting data for the report"})

converter = self.converter(blockstamp)

l_epoch, r_epoch = self.get_epochs_range_to_process(blockstamp)
logger.info({"msg": f"Epochs range for performance data collect: [{l_epoch};{r_epoch}]"})

self.state.migrate(l_epoch, r_epoch, converter.frame_config.epochs_per_frame)
self.state.log_progress()

if not self.state.is_fulfilled:
for l_epoch_, r_epoch_ in self.state.frames:
is_data_range_available = self.w3.performance.is_range_available(
l_epoch_, r_epoch_
)
if not is_data_range_available:
logger.warning({"msg": f"Performance data range is not available yet for [{l_epoch_};{r_epoch_}] frame"})
return False
self.fulfill_state()

return self.state.is_fulfilled

@lru_cache(maxsize=1)
@duration_meter()
def build_report(self, blockstamp: ReferenceBlockStamp) -> tuple:
self.validate_state(blockstamp)
l_epoch, r_epoch = self.get_epochs_range_to_process(blockstamp)
self.state.validate(l_epoch, r_epoch)

last_report = self._get_last_report(blockstamp)
rewards_tree_root, rewards_cid = last_report.rewards_tree_root, last_report.rewards_tree_cid
Expand Down Expand Up @@ -145,74 +169,43 @@ def is_reporting_allowed(self, blockstamp: ReferenceBlockStamp) -> bool:
CONTRACT_ON_PAUSE.labels("csm").set(on_pause)
return not on_pause

def validate_state(self, blockstamp: ReferenceBlockStamp) -> None:
# NOTE: We cannot use `r_epoch` from the `current_frame_range` call because the `blockstamp` is a
# `ReferenceBlockStamp`, hence it's a block the frame ends at. We use `ref_epoch` instead.
l_epoch, _ = self.get_epochs_range_to_process(blockstamp)
r_epoch = blockstamp.ref_epoch

self.state.validate(l_epoch, r_epoch)

def collect_data(self, blockstamp: BlockStamp) -> bool:
"""Ongoing report data collection for the estimated reference slot"""

logger.info({"msg": "Collecting data for the report"})

converter = self.converter(blockstamp)

l_epoch, r_epoch = self.get_epochs_range_to_process(blockstamp)
logger.info({"msg": f"Epochs range for performance data collect: [{l_epoch};{r_epoch}]"})

# NOTE: Finalized slot is the first slot of justifying epoch, so we need to take the previous. But if the first
# slot of the justifying epoch is empty, blockstamp.slot_number will point to the slot where the last finalized
# block was created. As a result, finalized_epoch in this case will be less than the actual number of the last
# finalized epoch. As a result we can have a delay in frame finalization.
finalized_epoch = EpochNumber(converter.get_epoch_by_slot(blockstamp.slot_number) - 1)

report_blockstamp = self.get_blockstamp_for_report(blockstamp)

if not report_blockstamp:
logger.info({"msg": "No report blockstamp available, using pre-computed one for collecting data"})

if report_blockstamp and report_blockstamp.ref_epoch != r_epoch:
logger.warning(
{
"msg": f"Epochs range has been changed, but the change is not yet observed on finalized epoch {finalized_epoch}"
}
)
return False

if l_epoch > finalized_epoch:
logger.info({"msg": "The starting epoch of the epochs range is not finalized yet"})
return False

self.state.migrate(l_epoch, r_epoch, converter.frame_config.epochs_per_frame)
self.state.log_progress()

if self.state.is_fulfilled:
logger.info({"msg": "All epochs are already processed. Nothing to collect"})
return True

try:
checkpoints = FrameCheckpointsIterator(
converter,
min(self.state.unprocessed_epochs),
r_epoch,
finalized_epoch,
)
except MinStepIsNotReached:
return False

processor = FrameCheckpointProcessor(self.w3.cc, self.state, converter, blockstamp)

for checkpoint in checkpoints:
if self.get_epochs_range_to_process(self._receive_last_finalized_slot()) != (l_epoch, r_epoch):
logger.info({"msg": "Checkpoints were prepared for an outdated epochs range, stop processing"})
raise ValueError("Outdated checkpoint")
processor.exec(checkpoint)
# Reset BaseOracle cycle timeout to avoid timeout errors during long checkpoints processing
self._reset_cycle_timeout()
return self.state.is_fulfilled
def fulfill_state(self):
finalized_blockstamp = self._receive_last_finalized_slot()
validators = self.w3.cc.get_validators(finalized_blockstamp)

for l_epoch, r_epoch in self.state.frames:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to understand that self.state.frames is not empty?
I mean, looks like fulfill_state need some pre-requirments before run. If yes - better to check this requirments and drop exceptions if the requirements are not met.

for epoch in sequence(l_epoch, r_epoch):
epoch_data = self.w3.performance.get_epoch(epoch)
if epoch_data is None:
raise ValueError(f"Epoch {epoch} is missing in Performance Collector")
misses, props, syncs = epoch_data

for validator in validators:
missed_att = validator.index in misses
included_att = validator.index not in misses
is_active = is_active_validator(validator, EpochNumber(epoch))
if not is_active and missed_att:
raise ValueError(f"Validator {validator.index} missed attestation in epoch {epoch}, but was not active")

self.state.save_att_duty(EpochNumber(epoch), validator.index, included=included_att)

blocks_in_epoch = 0
for p in props:
vid = ValidatorIndex(p.validator_index)
self.state.save_prop_duty(EpochNumber(epoch), vid, included=bool(p.is_proposed))
blocks_in_epoch += p.is_proposed

if blocks_in_epoch and syncs:
for rec in syncs:
vid = ValidatorIndex(rec.validator_index)
fulfilled = max(0, blocks_in_epoch - rec.missed_count)
for _ in range(fulfilled):
self.state.save_sync_duty(EpochNumber(epoch), vid, included=True)
for _ in range(rec.missed_count):
self.state.save_sync_duty(EpochNumber(epoch), vid, included=False)

self.state.add_processed_epoch(EpochNumber(epoch))
self.state.log_progress()

def make_rewards_tree(self, shares: dict[NodeOperatorId, RewardsShares]) -> RewardsTree:
if not shares:
Expand Down Expand Up @@ -250,43 +243,33 @@ def publish_log(self, logs: list[FramePerfLog]) -> CID:
return log_cid

@lru_cache(maxsize=1)
def get_epochs_range_to_process(self, blockstamp: BlockStamp) -> tuple[EpochNumber, EpochNumber]:
def get_epochs_range_to_process(self, blockstamp: ReferenceBlockStamp) -> tuple[EpochNumber, EpochNumber]:
converter = self.converter(blockstamp)

far_future_initial_epoch = converter.get_epoch_by_timestamp(UINT64_MAX)
if converter.frame_config.initial_epoch == far_future_initial_epoch:
raise ValueError("CSM oracle initial epoch is not set yet")

l_ref_slot = last_processing_ref_slot = self.w3.csm.get_csm_last_processing_ref_slot(blockstamp)
r_ref_slot = initial_ref_slot = self.get_initial_ref_slot(blockstamp)

if last_processing_ref_slot > blockstamp.slot_number:
raise InconsistentData(f"{last_processing_ref_slot=} > {blockstamp.slot_number=}")

# The very first report, no previous ref slot.
if not last_processing_ref_slot:
initial_ref_slot = self.get_initial_ref_slot(blockstamp)
l_ref_slot = SlotNumber(initial_ref_slot - converter.slots_per_frame)
if l_ref_slot < 0:
raise CSMError("Invalid frame configuration for the current network")

# NOTE: before the initial slot the contract can't return current frame
if blockstamp.slot_number > initial_ref_slot:
r_ref_slot = self.get_initial_or_current_frame(blockstamp).ref_slot

# We are between reports, next report slot didn't happen yet. Predicting the next ref slot for the report
# to calculate epochs range to collect the data.
if l_ref_slot == r_ref_slot:
r_ref_slot = converter.get_epoch_last_slot(
EpochNumber(converter.get_epoch_by_slot(l_ref_slot) + converter.frame_config.epochs_per_frame)
)

r_ref_slot = blockstamp.slot_number
if l_ref_slot < last_processing_ref_slot:
raise CSMError(f"Got invalid epochs range: {l_ref_slot=} < {last_processing_ref_slot=}")
if l_ref_slot >= r_ref_slot:
raise CSMError(f"Got invalid epochs range {r_ref_slot=}, {l_ref_slot=}")

l_epoch = converter.get_epoch_by_slot(SlotNumber(l_ref_slot + 1))
r_epoch = converter.get_epoch_by_slot(r_ref_slot)
r_epoch = blockstamp.ref_epoch

# Update Prometheus metrics
CSM_CURRENT_FRAME_RANGE_L_EPOCH.set(l_epoch)
Expand Down
Empty file.
Loading
Loading