-
Notifications
You must be signed in to change notification settings - Fork 45
[WIP] feat: separate performance collection and distribution #802
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
vgorkavenko
wants to merge
37
commits into
csm-next
Choose a base branch
from
feat/performance-collector
base: csm-next
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from 9 commits
Commits
Show all changes
37 commits
Select commit
Hold shift + click to select a range
0f45ec5
feat: separate performance collection and distribution
vgorkavenko 962a076
Merge remote-tracking branch 'origin/csm-next' into feat/performance-…
vgorkavenko 453effb
fix: lock
vgorkavenko 485fe91
fix: black
vgorkavenko d9e5888
Merge remote-tracking branch 'origin/csm-next' into feat/performance-…
vgorkavenko 8d86ca7
fix: already processed epochs
vgorkavenko 9e2c951
feat: ChainConverter
vgorkavenko 6e95c01
refactor: types
vgorkavenko 46decb1
feat: better logging
vgorkavenko 39d50d2
feat: additional validation
vgorkavenko d689979
refactor: db
vgorkavenko fa5ed3c
feat: `epochs_demand`
vgorkavenko a5d07f7
fix: `missing_epochs_in`
vgorkavenko 1653943
fix: logic, logging
vgorkavenko aa871b3
fix: csm.execute_module
vgorkavenko 2a70199
feat: add `_post` for http_provider
vgorkavenko db10267
fix: SafeBorder SafeBorder inheritance issue
vgorkavenko 0925e5f
fix: remove TODOs
vgorkavenko 84b2e47
feat: add `PERFORMANCE_COLLECTOR_DB_CONNECTION_TIMEOUT`
vgorkavenko b08865a
feat: use finalized epoch if no demands
vgorkavenko baf8c83
fix: `validate_state`
vgorkavenko e72cff2
WIP: `test_csm_module.py`
vgorkavenko 1d2fb60
WIP: `test_csm_module.py`
vgorkavenko e9758b0
fix: linter
vgorkavenko f932b99
fix: `define_epochs_to_process_range`
vgorkavenko 13330ea
fix: `define_epochs_to_process_range`. Simple AI tests
vgorkavenko 84ff68e
fix: remove `DEFAULT_EPOCHS_STEP_TO_COLLECT`
vgorkavenko 6adb0ae
fix: review
vgorkavenko cc080b3
fix: review
vgorkavenko 1203448
fix: remove old tests
vgorkavenko 3efa8b9
fix: imports in test
vgorkavenko 90c790e
fix: test_csm_module
vgorkavenko d95584f
fix: linter
vgorkavenko f676fa0
fix: some tests
vgorkavenko 9c1c902
fix: fork tests
vgorkavenko 53aac86
fix: review
vgorkavenko a8732eb
fix: errors
vgorkavenko File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,16 +9,7 @@ | |
| CSM_CURRENT_FRAME_RANGE_R_EPOCH, | ||
| ) | ||
| from src.metrics.prometheus.duration_meter import duration_meter | ||
| from src.modules.csm.checkpoint import ( | ||
| FrameCheckpointProcessor, | ||
| FrameCheckpointsIterator, | ||
| MinStepIsNotReached, | ||
| ) | ||
| from src.modules.csm.distribution import ( | ||
| Distribution, | ||
| DistributionResult, | ||
| StrikesValidator, | ||
| ) | ||
| from src.modules.csm.distribution import Distribution, DistributionResult, StrikesValidator | ||
| from src.modules.csm.helpers.last_report import LastReport | ||
| from src.modules.csm.log import FramePerfLog | ||
| from src.modules.csm.state import State | ||
|
|
@@ -35,8 +26,11 @@ | |
| EpochNumber, | ||
| ReferenceBlockStamp, | ||
| SlotNumber, | ||
| ValidatorIndex, | ||
| ) | ||
| from src.utils.cache import global_lru_cache as lru_cache | ||
| from src.utils.range import sequence | ||
| from src.utils.validator_state import is_active_validator | ||
| from src.utils.web3converter import Web3Converter | ||
| from src.web3py.extensions.lido_validators import NodeOperatorId | ||
| from src.web3py.types import Web3 | ||
|
|
@@ -77,24 +71,58 @@ def execute_module(self, last_finalized_blockstamp: BlockStamp) -> ModuleExecute | |
| if not self._check_compatability(last_finalized_blockstamp): | ||
| return ModuleExecuteDelay.NEXT_FINALIZED_EPOCH | ||
|
|
||
| collected = self.collect_data(last_finalized_blockstamp) | ||
| if not collected: | ||
| logger.info( | ||
| {"msg": "Data required for the report is not fully collected yet. Waiting for the next finalized epoch"} | ||
| ) | ||
| return ModuleExecuteDelay.NEXT_FINALIZED_EPOCH | ||
|
|
||
| report_blockstamp = self.get_blockstamp_for_report(last_finalized_blockstamp) | ||
| if not report_blockstamp: | ||
| return ModuleExecuteDelay.NEXT_FINALIZED_EPOCH | ||
|
|
||
| collected = self.collect_data(report_blockstamp) | ||
| if not collected: | ||
| return ModuleExecuteDelay.NEXT_FINALIZED_EPOCH | ||
|
|
||
| self.process_report(report_blockstamp) | ||
| return ModuleExecuteDelay.NEXT_SLOT | ||
|
|
||
| @duration_meter() | ||
| def collect_data(self, blockstamp: ReferenceBlockStamp) -> bool: | ||
| logger.info({"msg": "Collecting data for the report"}) | ||
|
|
||
| converter = self.converter(blockstamp) | ||
|
|
||
| l_epoch, r_epoch = self.get_epochs_range_to_process(blockstamp) | ||
| logger.info({"msg": f"Epochs range for performance data collect: [{l_epoch};{r_epoch}]"}) | ||
|
|
||
| self.state.migrate(l_epoch, r_epoch, converter.frame_config.epochs_per_frame) | ||
| self.state.log_progress() | ||
|
|
||
| if not self.state.is_fulfilled: | ||
| for l_epoch_, r_epoch_ in self.state.frames: | ||
| logger.info({ | ||
| "msg": "Requesting performance data availability check", | ||
| "start_epoch": l_epoch_, | ||
| "end_epoch": r_epoch_ | ||
| }) | ||
| is_data_range_available = self.w3.performance.is_range_available( | ||
| l_epoch_, r_epoch_ | ||
| ) | ||
| if not is_data_range_available: | ||
| logger.warning({"msg": f"Performance data range is not available yet for [{l_epoch_};{r_epoch_}] frame"}) | ||
| # TODO: set r_epoch r_epoch for FrameCheckpointsIterator softly through POST request | ||
| return False | ||
| else: | ||
| logger.info({ | ||
| "msg": "Performance data range is available", | ||
| "start_epoch": l_epoch_, | ||
| "end_epoch": r_epoch_ | ||
| }) | ||
| self.fulfill_state() | ||
|
|
||
| return self.state.is_fulfilled | ||
|
|
||
| @lru_cache(maxsize=1) | ||
| @duration_meter() | ||
| def build_report(self, blockstamp: ReferenceBlockStamp) -> tuple: | ||
| self.validate_state(blockstamp) | ||
| l_epoch, r_epoch = self.get_epochs_range_to_process(blockstamp) | ||
| self.state.validate(l_epoch, r_epoch) | ||
|
|
||
| last_report = self._get_last_report(blockstamp) | ||
| rewards_tree_root, rewards_cid = last_report.rewards_tree_root, last_report.rewards_tree_cid | ||
|
|
@@ -154,74 +182,73 @@ def is_reporting_allowed(self, blockstamp: ReferenceBlockStamp) -> bool: | |
| CONTRACT_ON_PAUSE.labels("csm").set(on_pause) | ||
| return not on_pause | ||
|
|
||
| def validate_state(self, blockstamp: ReferenceBlockStamp) -> None: | ||
| # NOTE: We cannot use `r_epoch` from the `current_frame_range` call because the `blockstamp` is a | ||
| # `ReferenceBlockStamp`, hence it's a block the frame ends at. We use `ref_epoch` instead. | ||
| l_epoch, _ = self.get_epochs_range_to_process(blockstamp) | ||
| r_epoch = blockstamp.ref_epoch | ||
|
|
||
| self.state.validate(l_epoch, r_epoch) | ||
|
|
||
| def collect_data(self, blockstamp: BlockStamp) -> bool: | ||
| """Ongoing report data collection for the estimated reference slot""" | ||
|
|
||
| logger.info({"msg": "Collecting data for the report"}) | ||
|
|
||
| converter = self.converter(blockstamp) | ||
|
|
||
| l_epoch, r_epoch = self.get_epochs_range_to_process(blockstamp) | ||
| logger.info({"msg": f"Epochs range for performance data collect: [{l_epoch};{r_epoch}]"}) | ||
|
|
||
| # NOTE: Finalized slot is the first slot of justifying epoch, so we need to take the previous. But if the first | ||
| # slot of the justifying epoch is empty, blockstamp.slot_number will point to the slot where the last finalized | ||
| # block was created. As a result, finalized_epoch in this case will be less than the actual number of the last | ||
| # finalized epoch. As a result we can have a delay in frame finalization. | ||
| finalized_epoch = EpochNumber(converter.get_epoch_by_slot(blockstamp.slot_number) - 1) | ||
|
|
||
| report_blockstamp = self.get_blockstamp_for_report(blockstamp) | ||
|
|
||
| if not report_blockstamp: | ||
| logger.info({"msg": "No report blockstamp available, using pre-computed one for collecting data"}) | ||
|
|
||
| if report_blockstamp and report_blockstamp.ref_epoch != r_epoch: | ||
| logger.warning( | ||
| { | ||
| "msg": f"Epochs range has been changed, but the change is not yet observed on finalized epoch {finalized_epoch}" | ||
| } | ||
| ) | ||
| return False | ||
|
|
||
| if l_epoch > finalized_epoch: | ||
| logger.info({"msg": "The starting epoch of the epochs range is not finalized yet"}) | ||
| return False | ||
|
|
||
| self.state.migrate(l_epoch, r_epoch, converter.frame_config.epochs_per_frame) | ||
| self.state.log_progress() | ||
|
|
||
| if self.state.is_fulfilled: | ||
| logger.info({"msg": "All epochs are already processed. Nothing to collect"}) | ||
| return True | ||
|
|
||
| try: | ||
| checkpoints = FrameCheckpointsIterator( | ||
| converter, | ||
| min(self.state.unprocessed_epochs), | ||
| r_epoch, | ||
| finalized_epoch, | ||
| ) | ||
| except MinStepIsNotReached: | ||
| return False | ||
|
|
||
| processor = FrameCheckpointProcessor(self.w3.cc, self.state, converter, blockstamp) | ||
|
|
||
| for checkpoint in checkpoints: | ||
| if self.get_epochs_range_to_process(self._receive_last_finalized_slot()) != (l_epoch, r_epoch): | ||
| logger.info({"msg": "Checkpoints were prepared for an outdated epochs range, stop processing"}) | ||
| raise ValueError("Outdated checkpoint") | ||
| processor.exec(checkpoint) | ||
| # Reset BaseOracle cycle timeout to avoid timeout errors during long checkpoints processing | ||
| self._reset_cycle_timeout() | ||
| return self.state.is_fulfilled | ||
| def fulfill_state(self): | ||
| finalized_blockstamp = self._receive_last_finalized_slot() | ||
| validators = self.w3.cc.get_validators(finalized_blockstamp) | ||
|
|
||
| logger.info({ | ||
| "msg": "Starting state fulfillment", | ||
| "total_frames": len(self.state.frames), | ||
| "total_validators": len(validators) | ||
| }) | ||
|
|
||
| for l_epoch, r_epoch in self.state.frames: | ||
| logger.info({ | ||
| "msg": "Processing frame", | ||
| "start_epoch": l_epoch, | ||
| "end_epoch": r_epoch, | ||
| "total_epochs": r_epoch - l_epoch + 1 | ||
| }) | ||
|
|
||
| for epoch in sequence(l_epoch, r_epoch): | ||
| if epoch not in self.state.unprocessed_epochs: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The same here |
||
| logger.debug({"msg": f"Epoch {epoch} is already processed"}) | ||
| continue | ||
|
|
||
| logger.info({ | ||
| "msg": "Requesting performance data from collector", | ||
| "epoch": epoch | ||
| }) | ||
| epoch_data = self.w3.performance.get_epoch(epoch) | ||
| if epoch_data is None: | ||
| logger.warning({"msg": f"Epoch {epoch} is missing in Performance Collector"}) | ||
| continue | ||
vgorkavenko marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| misses, props, syncs = epoch_data | ||
| logger.info({ | ||
| "msg": "Performance data received", | ||
| "epoch": epoch, | ||
| "misses_count": len(misses), | ||
| "proposals_count": len(props), | ||
| "sync_duties_count": len(syncs) | ||
| }) | ||
|
|
||
| for validator in validators: | ||
| missed_att = validator.index in misses | ||
| included_att = validator.index not in misses | ||
| is_active = is_active_validator(validator, EpochNumber(epoch)) | ||
| if not is_active and missed_att: | ||
| raise ValueError(f"Validator {validator.index} missed attestation in epoch {epoch}, but was not active") | ||
| self.state.save_att_duty(EpochNumber(epoch), validator.index, included=included_att) | ||
|
|
||
| blocks_in_epoch = 0 | ||
|
|
||
| for p in props: | ||
| vid = ValidatorIndex(p.validator_index) | ||
| self.state.save_prop_duty(EpochNumber(epoch), vid, included=bool(p.is_proposed)) | ||
| blocks_in_epoch += p.is_proposed | ||
|
|
||
| if blocks_in_epoch and syncs: | ||
| for rec in syncs: | ||
| vid = ValidatorIndex(rec.validator_index) | ||
| fulfilled = max(0, blocks_in_epoch - rec.missed_count) | ||
| for _ in range(fulfilled): | ||
| self.state.save_sync_duty(EpochNumber(epoch), vid, included=True) | ||
| for _ in range(rec.missed_count): | ||
| self.state.save_sync_duty(EpochNumber(epoch), vid, included=False) | ||
|
|
||
| self.state.add_processed_epoch(EpochNumber(epoch)) | ||
| self.state.log_progress() | ||
|
|
||
| def make_rewards_tree(self, shares: dict[NodeOperatorId, RewardsShares]) -> RewardsTree: | ||
| if not shares: | ||
|
|
@@ -259,43 +286,33 @@ def publish_log(self, logs: list[FramePerfLog]) -> CID: | |
| return log_cid | ||
|
|
||
| @lru_cache(maxsize=1) | ||
| def get_epochs_range_to_process(self, blockstamp: BlockStamp) -> tuple[EpochNumber, EpochNumber]: | ||
| def get_epochs_range_to_process(self, blockstamp: ReferenceBlockStamp) -> tuple[EpochNumber, EpochNumber]: | ||
| converter = self.converter(blockstamp) | ||
|
|
||
| far_future_initial_epoch = converter.get_epoch_by_timestamp(UINT64_MAX) | ||
| if converter.frame_config.initial_epoch == far_future_initial_epoch: | ||
| raise ValueError("CSM oracle initial epoch is not set yet") | ||
|
|
||
| l_ref_slot = last_processing_ref_slot = self.w3.csm.get_csm_last_processing_ref_slot(blockstamp) | ||
| r_ref_slot = initial_ref_slot = self.get_initial_ref_slot(blockstamp) | ||
|
|
||
| if last_processing_ref_slot > blockstamp.slot_number: | ||
| raise InconsistentData(f"{last_processing_ref_slot=} > {blockstamp.slot_number=}") | ||
|
|
||
| # The very first report, no previous ref slot. | ||
| if not last_processing_ref_slot: | ||
| initial_ref_slot = self.get_initial_ref_slot(blockstamp) | ||
| l_ref_slot = SlotNumber(initial_ref_slot - converter.slots_per_frame) | ||
| if l_ref_slot < 0: | ||
| raise CSMError("Invalid frame configuration for the current network") | ||
|
|
||
| # NOTE: before the initial slot the contract can't return current frame | ||
| if blockstamp.slot_number > initial_ref_slot: | ||
| r_ref_slot = self.get_initial_or_current_frame(blockstamp).ref_slot | ||
|
|
||
| # We are between reports, next report slot didn't happen yet. Predicting the next ref slot for the report | ||
| # to calculate epochs range to collect the data. | ||
| if l_ref_slot == r_ref_slot: | ||
| r_ref_slot = converter.get_epoch_last_slot( | ||
| EpochNumber(converter.get_epoch_by_slot(l_ref_slot) + converter.frame_config.epochs_per_frame) | ||
| ) | ||
|
|
||
| r_ref_slot = blockstamp.slot_number | ||
| if l_ref_slot < last_processing_ref_slot: | ||
| raise CSMError(f"Got invalid epochs range: {l_ref_slot=} < {last_processing_ref_slot=}") | ||
| if l_ref_slot >= r_ref_slot: | ||
| raise CSMError(f"Got invalid epochs range {r_ref_slot=}, {l_ref_slot=}") | ||
|
|
||
| l_epoch = converter.get_epoch_by_slot(SlotNumber(l_ref_slot + 1)) | ||
| r_epoch = converter.get_epoch_by_slot(r_ref_slot) | ||
| r_epoch = blockstamp.ref_epoch | ||
|
|
||
| # Update Prometheus metrics | ||
| CSM_CURRENT_FRAME_RANGE_L_EPOCH.set(l_epoch) | ||
|
|
||
Empty file.
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How to understand that
self.state.framesis not empty?I mean, looks like fulfill_state need some pre-requirments before run. If yes - better to check this requirments and drop exceptions if the requirements are not met.