-
Notifications
You must be signed in to change notification settings - Fork 45
[WIP] feat: separate performance collection and distribution #802
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: csm-next
Are you sure you want to change the base?
Changes from 27 commits
0f45ec5
962a076
453effb
485fe91
d9e5888
8d86ca7
9e2c951
6e95c01
46decb1
39d50d2
d689979
fa5ed3c
a5d07f7
1653943
aa871b3
2a70199
db10267
0925e5f
84b2e47
b08865a
baf8c83
e72cff2
1d2fb60
e9758b0
f932b99
13330ea
84ff68e
6adb0ae
cc080b3
1203448
3efa8b9
90c790e
d95584f
f676fa0
9c1c902
53aac86
a8732eb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,16 +9,7 @@ | |
| CSM_CURRENT_FRAME_RANGE_R_EPOCH, | ||
| ) | ||
| from src.metrics.prometheus.duration_meter import duration_meter | ||
| from src.modules.csm.checkpoint import ( | ||
| FrameCheckpointProcessor, | ||
| FrameCheckpointsIterator, | ||
| MinStepIsNotReached, | ||
| ) | ||
| from src.modules.csm.distribution import ( | ||
| Distribution, | ||
| DistributionResult, | ||
| StrikesValidator, | ||
| ) | ||
| from src.modules.csm.distribution import Distribution, DistributionResult, StrikesValidator | ||
| from src.modules.csm.helpers.last_report import LastReport | ||
| from src.modules.csm.log import FramePerfLog | ||
| from src.modules.csm.state import State | ||
|
|
@@ -35,8 +26,11 @@ | |
| EpochNumber, | ||
| ReferenceBlockStamp, | ||
| SlotNumber, | ||
| ValidatorIndex, | ||
| ) | ||
| from src.utils.cache import global_lru_cache as lru_cache | ||
| from src.utils.range import sequence | ||
| from src.utils.validator_state import is_active_validator | ||
| from src.utils.web3converter import Web3Converter | ||
| from src.web3py.extensions.lido_validators import NodeOperatorId | ||
| from src.web3py.types import Web3 | ||
|
|
@@ -77,20 +71,63 @@ def execute_module(self, last_finalized_blockstamp: BlockStamp) -> ModuleExecute | |
| if not self._check_compatability(last_finalized_blockstamp): | ||
| return ModuleExecuteDelay.NEXT_FINALIZED_EPOCH | ||
|
|
||
| collected = self.collect_data(last_finalized_blockstamp) | ||
| if not collected: | ||
| logger.info( | ||
| {"msg": "Data required for the report is not fully collected yet. Waiting for the next finalized epoch"} | ||
| ) | ||
| return ModuleExecuteDelay.NEXT_FINALIZED_EPOCH | ||
| self.set_epochs_range_to_collect(last_finalized_blockstamp) | ||
|
|
||
| report_blockstamp = self.get_blockstamp_for_report(last_finalized_blockstamp) | ||
| if not report_blockstamp: | ||
| return ModuleExecuteDelay.NEXT_FINALIZED_EPOCH | ||
|
|
||
| collected = self.collect_data() | ||
| if not collected: | ||
| return ModuleExecuteDelay.NEXT_FINALIZED_EPOCH | ||
|
|
||
| self.process_report(report_blockstamp) | ||
| return ModuleExecuteDelay.NEXT_SLOT | ||
|
|
||
| @duration_meter() | ||
| def set_epochs_range_to_collect(self, blockstamp: BlockStamp): | ||
| consumer = self.__class__.__name__ | ||
| converter = self.converter(blockstamp) | ||
|
|
||
| l_epoch, r_epoch = self.get_epochs_range_to_process(blockstamp) | ||
| self.state.migrate(l_epoch, r_epoch, converter.frame_config.epochs_per_frame) | ||
| self.state.log_progress() | ||
|
|
||
| current_demands = self.w3.performance.get_epochs_demand() | ||
| current_demand = current_demands.get(consumer) | ||
| if current_demand != (l_epoch, r_epoch): | ||
| logger.info({ | ||
| "msg": f"Updating {consumer} epochs demand for Performance Collector", | ||
| "old": current_demand, | ||
| "new": (l_epoch, r_epoch) | ||
| }) | ||
| self.w3.performance.post_epochs_demand(consumer, l_epoch, r_epoch) | ||
|
|
||
| @duration_meter() | ||
| def collect_data(self) -> bool: | ||
| logger.info({"msg": "Collecting data for the report from Performance Collector"}) | ||
|
|
||
| if not self.state.is_fulfilled: | ||
| for l_epoch, r_epoch in self.state.frames: | ||
| is_data_range_available = self.w3.performance.is_range_available( | ||
| l_epoch, r_epoch | ||
| ) | ||
| if not is_data_range_available: | ||
| logger.warning({ | ||
| "msg": "Performance data range is not available yet", | ||
| "start_epoch": l_epoch, | ||
| "end_epoch": r_epoch | ||
| }) | ||
| return False | ||
| logger.info({ | ||
| "msg": "Performance data range is available", | ||
| "start_epoch": l_epoch, | ||
| "end_epoch": r_epoch | ||
| }) | ||
| self.fulfill_state() | ||
|
|
||
| return self.state.is_fulfilled | ||
|
|
||
| @lru_cache(maxsize=1) | ||
| @duration_meter() | ||
| def build_report(self, blockstamp: ReferenceBlockStamp) -> tuple: | ||
|
|
@@ -162,66 +199,73 @@ def validate_state(self, blockstamp: ReferenceBlockStamp) -> None: | |
|
|
||
| self.state.validate(l_epoch, r_epoch) | ||
|
|
||
| def collect_data(self, blockstamp: BlockStamp) -> bool: | ||
| """Ongoing report data collection for the estimated reference slot""" | ||
|
|
||
| logger.info({"msg": "Collecting data for the report"}) | ||
|
|
||
| converter = self.converter(blockstamp) | ||
|
|
||
| l_epoch, r_epoch = self.get_epochs_range_to_process(blockstamp) | ||
| logger.info({"msg": f"Epochs range for performance data collect: [{l_epoch};{r_epoch}]"}) | ||
|
|
||
| # NOTE: Finalized slot is the first slot of justifying epoch, so we need to take the previous. But if the first | ||
| # slot of the justifying epoch is empty, blockstamp.slot_number will point to the slot where the last finalized | ||
| # block was created. As a result, finalized_epoch in this case will be less than the actual number of the last | ||
| # finalized epoch. As a result we can have a delay in frame finalization. | ||
| finalized_epoch = EpochNumber(converter.get_epoch_by_slot(blockstamp.slot_number) - 1) | ||
|
|
||
| report_blockstamp = self.get_blockstamp_for_report(blockstamp) | ||
|
|
||
| if not report_blockstamp: | ||
| logger.info({"msg": "No report blockstamp available, using pre-computed one for collecting data"}) | ||
|
|
||
| if report_blockstamp and report_blockstamp.ref_epoch != r_epoch: | ||
| logger.warning( | ||
| { | ||
| "msg": f"Epochs range has been changed, but the change is not yet observed on finalized epoch {finalized_epoch}" | ||
| } | ||
| ) | ||
| return False | ||
|
|
||
| if l_epoch > finalized_epoch: | ||
| logger.info({"msg": "The starting epoch of the epochs range is not finalized yet"}) | ||
| return False | ||
|
|
||
| self.state.migrate(l_epoch, r_epoch, converter.frame_config.epochs_per_frame) | ||
| self.state.log_progress() | ||
|
|
||
| if self.state.is_fulfilled: | ||
| logger.info({"msg": "All epochs are already processed. Nothing to collect"}) | ||
| return True | ||
|
|
||
| try: | ||
| checkpoints = FrameCheckpointsIterator( | ||
| converter, | ||
| min(self.state.unprocessed_epochs), | ||
| r_epoch, | ||
| finalized_epoch, | ||
| ) | ||
| except MinStepIsNotReached: | ||
| return False | ||
|
|
||
| processor = FrameCheckpointProcessor(self.w3.cc, self.state, converter, blockstamp) | ||
|
|
||
| for checkpoint in checkpoints: | ||
| if self.get_epochs_range_to_process(self._receive_last_finalized_slot()) != (l_epoch, r_epoch): | ||
| logger.info({"msg": "Checkpoints were prepared for an outdated epochs range, stop processing"}) | ||
| raise ValueError("Outdated checkpoint") | ||
| processor.exec(checkpoint) | ||
| # Reset BaseOracle cycle timeout to avoid timeout errors during long checkpoints processing | ||
| self._reset_cycle_timeout() | ||
| return self.state.is_fulfilled | ||
| def fulfill_state(self): | ||
| finalized_blockstamp = self._receive_last_finalized_slot() | ||
| validators = self.w3.cc.get_validators(finalized_blockstamp) | ||
|
|
||
| logger.info({ | ||
| "msg": "Starting state fulfillment", | ||
| "total_frames": len(self.state.frames), | ||
| "total_validators": len(validators) | ||
| }) | ||
|
|
||
| for l_epoch, r_epoch in self.state.frames: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How to understand that |
||
| logger.info({ | ||
| "msg": "Processing frame", | ||
| "start_epoch": l_epoch, | ||
| "end_epoch": r_epoch, | ||
| "total_epochs": r_epoch - l_epoch + 1 | ||
| }) | ||
|
|
||
| for epoch in sequence(l_epoch, r_epoch): | ||
| if epoch not in self.state.unprocessed_epochs: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The same here |
||
| logger.debug({"msg": f"Epoch {epoch} is already processed"}) | ||
| continue | ||
|
|
||
| logger.info({ | ||
| "msg": "Requesting performance data from collector", | ||
| "epoch": epoch | ||
| }) | ||
| epoch_data = self.w3.performance.get_epoch(epoch) | ||
| if epoch_data is None: | ||
| logger.warning({"msg": f"Epoch {epoch} is missing in Performance Collector"}) | ||
| continue | ||
vgorkavenko marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| misses, props, syncs = epoch_data | ||
| logger.info({ | ||
| "msg": "Performance data received", | ||
| "epoch": epoch, | ||
| "misses_count": len(misses), | ||
| "proposals_count": len(props), | ||
| "sync_duties_count": len(syncs) | ||
| }) | ||
|
|
||
| for validator in validators: | ||
| missed_att = validator.index in misses | ||
| included_att = validator.index not in misses | ||
| is_active = is_active_validator(validator, EpochNumber(epoch)) | ||
| if not is_active and missed_att: | ||
| raise ValueError(f"Validator {validator.index} missed attestation in epoch {epoch}, but was not active") | ||
| self.state.save_att_duty(EpochNumber(epoch), validator.index, included=included_att) | ||
|
|
||
| blocks_in_epoch = 0 | ||
|
|
||
| for p in props: | ||
| vid = ValidatorIndex(p.validator_index) | ||
| self.state.save_prop_duty(EpochNumber(epoch), vid, included=bool(p.is_proposed)) | ||
| blocks_in_epoch += p.is_proposed | ||
|
|
||
| if blocks_in_epoch: | ||
| for rec in syncs: | ||
| vid = ValidatorIndex(rec.validator_index) | ||
| fulfilled = max(0, blocks_in_epoch - rec.missed_count) | ||
| for _ in range(fulfilled): | ||
| self.state.save_sync_duty(EpochNumber(epoch), vid, included=True) | ||
| for _ in range(rec.missed_count): | ||
| self.state.save_sync_duty(EpochNumber(epoch), vid, included=False) | ||
|
|
||
| self.state.add_processed_epoch(EpochNumber(epoch)) | ||
| self.state.log_progress() | ||
|
|
||
| def make_rewards_tree(self, shares: dict[NodeOperatorId, RewardsShares]) -> RewardsTree: | ||
| if not shares: | ||
|
|
@@ -301,6 +345,12 @@ def get_epochs_range_to_process(self, blockstamp: BlockStamp) -> tuple[EpochNumb | |
| CSM_CURRENT_FRAME_RANGE_L_EPOCH.set(l_epoch) | ||
| CSM_CURRENT_FRAME_RANGE_R_EPOCH.set(r_epoch) | ||
|
|
||
| logger.info({ | ||
| "msg": "Epochs range for the report", | ||
| "l_epoch": l_epoch, | ||
| "r_epoch": r_epoch | ||
| }) | ||
|
|
||
| return l_epoch, r_epoch | ||
|
|
||
| def converter(self, blockstamp: BlockStamp) -> Web3Converter: | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.