diff --git a/src/constants.py b/src/constants.py index f385b4b0b..9d54a09e3 100644 --- a/src/constants.py +++ b/src/constants.py @@ -62,5 +62,6 @@ ALLOWED_KAPI_VERSION = Version('1.5.0') CSM_STATE_VERSION = 1 +CSM_LOGS_VERSION = 1 GENESIS_VALIDATORS_ROOT = bytes([0] * 32) # all zeros for deposits diff --git a/src/modules/csm/csm.py b/src/modules/csm/csm.py index d2a22ddde..3318e9c85 100644 --- a/src/modules/csm/csm.py +++ b/src/modules/csm/csm.py @@ -20,7 +20,7 @@ StrikesValidator, ) from src.modules.csm.helpers.last_report import LastReport -from src.modules.csm.log import FramePerfLog +from src.modules.csm.log import Logs from src.modules.csm.state import State from src.modules.csm.tree import RewardsTree, StrikesTree, Tree from src.modules.csm.types import ReportData, RewardsShares, StrikesList @@ -136,6 +136,7 @@ def _get_last_report(self, blockstamp: ReferenceBlockStamp) -> LastReport: current_frame = self.get_frame_number_by_slot(blockstamp) return LastReport.load(self.w3, blockstamp, current_frame) + @lru_cache(maxsize=1) def calculate_distribution(self, blockstamp: ReferenceBlockStamp, last_report: LastReport) -> DistributionResult: distribution = Distribution(self.w3, self.converter(blockstamp), self.state) result = distribution.calculate(blockstamp, last_report) @@ -253,8 +254,8 @@ def publish_tree(self, tree: Tree) -> CID: logger.info({"msg": "Tree dump uploaded to IPFS", "cid": repr(tree_cid)}) return tree_cid - def publish_log(self, logs: list[FramePerfLog]) -> CID: - log_cid = self.w3.ipfs.publish(FramePerfLog.encode(logs)) + def publish_log(self, logs: Logs) -> CID: + log_cid = self.w3.ipfs.publish(logs.encode()) logger.info({"msg": "Frame(s) log uploaded to IPFS", "cid": repr(log_cid)}) return log_cid diff --git a/src/modules/csm/distribution.py b/src/modules/csm/distribution.py index 9301c9c39..7e140aa45 100644 --- a/src/modules/csm/distribution.py +++ b/src/modules/csm/distribution.py @@ -4,8 +4,9 @@ from copy import deepcopy from dataclasses import dataclass, field +from src.constants import MIN_ACTIVATION_BALANCE, MAX_EFFECTIVE_BALANCE_ELECTRA, EFFECTIVE_BALANCE_INCREMENT from src.modules.csm.helpers.last_report import LastReport -from src.modules.csm.log import FramePerfLog, OperatorFrameSummary +from src.modules.csm.log import FramePerfLog, OperatorFrameSummary, Logs from src.modules.csm.state import Frame, State, ValidatorDuties from src.modules.csm.types import ( ParticipationShares, @@ -48,7 +49,7 @@ class DistributionResult: total_rebate: RewardsShares = 0 total_rewards_map: dict[NodeOperatorId, RewardsShares] = field(default_factory=lambda: defaultdict(RewardsShares)) strikes: dict[StrikesValidator, StrikesList] = field(default_factory=lambda: defaultdict(StrikesList)) - logs: list[FramePerfLog] = field(default_factory=list) + logs: Logs = field(default_factory=Logs) class Distribution: @@ -99,7 +100,7 @@ def calculate(self, blockstamp: ReferenceBlockStamp, last_report: LastReport) -> for no_id, rewards in rewards_map_in_frame.items(): result.total_rewards_map[no_id] += rewards - result.logs.append(frame_log) + result.logs.frames.append(frame_log) if result.total_rewards != sum(result.total_rewards_map.values()): raise InconsistentData( @@ -157,7 +158,10 @@ def _calculate_distribution_in_frame( curve_params = self.w3.csm.get_curve_params(no_id, blockstamp) log_operator.performance_coefficients = curve_params.perf_coeffs - active_validators.sort(key=lambda v: v.index) + # Sort from biggest to smallest balance and by index from oldest to newest. + active_validators.sort( + key=lambda v: (-min(v.validator.effective_balance, MAX_EFFECTIVE_BALANCE_ELECTRA), v.index) + ) numbered_validators = enumerate(active_validators, 1) for key_number, validator in numbered_validators: key_threshold = max(network_perf - curve_params.perf_leeway_data.get_for(key_number), 0) @@ -254,10 +258,22 @@ def get_validator_duties_outcome( # 87.55 ≈ 88 of 103 participation shares should be counted for the operator key's reward. # The rest 15 participation shares should be counted for the protocol's rebate. # - participation_share = math.ceil(duties.attestation.assigned * reward_share) - rebate_share = duties.attestation.assigned - participation_share + val_effective_balance = min( + max(MIN_ACTIVATION_BALANCE, validator.validator.effective_balance), + MAX_EFFECTIVE_BALANCE_ELECTRA + ) + participation_share_multiplier = val_effective_balance // EFFECTIVE_BALANCE_INCREMENT + # + # Due to CL rewarding process, validators are getting rewards in proportion to their effective balance: + # https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#helpers + # Distribution should calculate participation and rebate shares proportional to effective balance as well. + # NOTE: Moment of getting 2048 ETH can be too close to the report and trick the distribution. + # + assigned_att = duties.attestation.assigned * participation_share_multiplier + participation_share = math.ceil(assigned_att * reward_share) + rebate_share = assigned_att - participation_share if rebate_share < 0: - raise ValueError(f"Invalid rebate share: {rebate_share=}") + raise ValueError(f"Invalid rebate share for validator {validator.index}: {rebate_share=}") return ValidatorDutiesOutcome(participation_share, rebate_share, strikes=0) # In case of bad performance the validator should be striked and assigned attestations are not counted for diff --git a/src/modules/csm/log.py b/src/modules/csm/log.py index 3dfef5be3..745de0aaa 100644 --- a/src/modules/csm/log.py +++ b/src/modules/csm/log.py @@ -2,6 +2,7 @@ from collections import defaultdict from dataclasses import asdict, dataclass, field +from src.constants import CSM_LOGS_VERSION from src.modules.csm.state import DutyAccumulator from src.modules.csm.types import RewardsShares from src.providers.execution.contracts.cs_parameters_registry import PerformanceCoefficients @@ -43,14 +44,19 @@ class FramePerfLog: default_factory=lambda: defaultdict(OperatorFrameSummary) ) - @staticmethod - def encode(logs: list['FramePerfLog']) -> bytes: + +@dataclass +class Logs: + frames: list[FramePerfLog] = field(default_factory=list) + _ver: int = CSM_LOGS_VERSION + + def encode(self) -> bytes: return ( LogJSONEncoder( indent=None, separators=(',', ':'), sort_keys=True, ) - .encode([asdict(log) for log in logs]) + .encode(asdict(self)) .encode() ) diff --git a/src/providers/execution/contracts/cs_accounting.py b/src/providers/execution/contracts/cs_accounting.py index 7fc73f942..4e6de35f3 100644 --- a/src/providers/execution/contracts/cs_accounting.py +++ b/src/providers/execution/contracts/cs_accounting.py @@ -18,10 +18,10 @@ class CSAccountingContract(ContractInterface): def fee_distributor(self, block_identifier: BlockIdentifier = "latest") -> ChecksumAddress: """Returns the address of the CSFeeDistributor contract""" - resp = self.functions.feeDistributor().call(block_identifier=block_identifier) + resp = self.functions.FEE_DISTRIBUTOR().call(block_identifier=block_identifier) logger.info( { - "msg": "Call `feeDistributor()`.", + "msg": "Call `FEE_DISTRIBUTOR()`.", "value": resp, "block_identifier": repr(block_identifier), } diff --git a/tests/modules/csm/test_csm_distribution.py b/tests/modules/csm/test_csm_distribution.py index 93beedf8d..649122237 100644 --- a/tests/modules/csm/test_csm_distribution.py +++ b/tests/modules/csm/test_csm_distribution.py @@ -1,4 +1,5 @@ import re +import math from collections import defaultdict from unittest.mock import Mock @@ -6,7 +7,11 @@ from hexbytes import HexBytes from web3.types import Wei -from src.constants import TOTAL_BASIS_POINTS +from src.constants import ( + TOTAL_BASIS_POINTS, + MIN_ACTIVATION_BALANCE, + EFFECTIVE_BALANCE_INCREMENT, +) from src.modules.csm.distribution import Distribution, ValidatorDuties, ValidatorDutiesOutcome from src.modules.csm.log import FramePerfLog, ValidatorFrameSummary, OperatorFrameSummary from src.modules.csm.state import DutyAccumulator, State, NetworkDuties, Frame @@ -20,7 +25,7 @@ KeyNumberValueIntervalList, ) from src.providers.execution.exceptions import InconsistentData -from src.types import NodeOperatorId, EpochNumber, ValidatorIndex, ReferenceBlockStamp +from src.types import NodeOperatorId, EpochNumber, ValidatorIndex from src.web3py.extensions import CSM from src.web3py.types import Web3 from tests.factory.blockstamp import ReferenceBlockStampFactory @@ -309,8 +314,8 @@ def test_calculate_distribution( assert result.total_rebate == expected_total_rebate assert result.strikes == expected_strikes - assert len(result.logs) == len(frames) - for i, log in enumerate(result.logs): + assert len(result.logs.frames) == len(frames) + for i, log in enumerate(result.logs.frames): assert log.blockstamp == frame_blockstamps[i] assert log.frame == frames[i] @@ -532,40 +537,64 @@ def test_calculate_distribution_handles_invalid_distribution_in_total(): # Operator 1. One above threshold performance, one slashed (..., NodeOperatorId(1)): [ LidoValidatorFactory.build( - index=ValidatorIndex(1), validator=ValidatorStateFactory.build(slashed=False, pubkey="0x01") + index=ValidatorIndex(1), + validator=ValidatorStateFactory.build( + slashed=False, pubkey="0x01", effective_balance=MIN_ACTIVATION_BALANCE + ), ), LidoValidatorFactory.build( - index=ValidatorIndex(2), validator=ValidatorStateFactory.build(slashed=True, pubkey="0x02") + index=ValidatorIndex(2), + validator=ValidatorStateFactory.build( + slashed=True, pubkey="0x02", effective_balance=MIN_ACTIVATION_BALANCE + ), ), ], # Operator 2. One above threshold performance, one below (..., NodeOperatorId(2)): [ LidoValidatorFactory.build( - index=ValidatorIndex(3), validator=ValidatorStateFactory.build(slashed=False, pubkey="0x03") + index=ValidatorIndex(3), + validator=ValidatorStateFactory.build( + slashed=False, pubkey="0x03", effective_balance=MIN_ACTIVATION_BALANCE + ), ), LidoValidatorFactory.build( - index=ValidatorIndex(4), validator=ValidatorStateFactory.build(slashed=False, pubkey="0x04") + index=ValidatorIndex(4), + validator=ValidatorStateFactory.build( + slashed=False, pubkey="0x04", effective_balance=MIN_ACTIVATION_BALANCE + ), ), ], # Operator 3. All below threshold performance (..., NodeOperatorId(3)): [ LidoValidatorFactory.build( - index=ValidatorIndex(5), validator=ValidatorStateFactory.build(slashed=False, pubkey="0x05") + index=ValidatorIndex(5), + validator=ValidatorStateFactory.build( + slashed=False, pubkey="0x05", effective_balance=MIN_ACTIVATION_BALANCE + ), ), ], # Operator 4. No duties (..., NodeOperatorId(4)): [ LidoValidatorFactory.build( - index=ValidatorIndex(6), validator=ValidatorStateFactory.build(slashed=False, pubkey="0x06") + index=ValidatorIndex(6), + validator=ValidatorStateFactory.build( + slashed=False, pubkey="0x06", effective_balance=MIN_ACTIVATION_BALANCE + ), ), ], # Operator 5. All above threshold performance (..., NodeOperatorId(5)): [ LidoValidatorFactory.build( - index=ValidatorIndex(7), validator=ValidatorStateFactory.build(slashed=False, pubkey="0x07") + index=ValidatorIndex(7), + validator=ValidatorStateFactory.build( + slashed=False, pubkey="0x07", effective_balance=MIN_ACTIVATION_BALANCE + ), ), LidoValidatorFactory.build( - index=ValidatorIndex(8), validator=ValidatorStateFactory.build(slashed=False, pubkey="0x08") + index=ValidatorIndex(8), + validator=ValidatorStateFactory.build( + slashed=False, pubkey="0x08", effective_balance=MIN_ACTIVATION_BALANCE + ), ), ], }, @@ -887,7 +916,18 @@ def test_get_network_performance_raises_error_for_invalid_performance(): False, 0.5, 1, - ValidatorDutiesOutcome(participation_share=10, rebate_share=0, strikes=0), + ValidatorDutiesOutcome(participation_share=10 * 32, rebate_share=0, strikes=0), + ), + ( + ValidatorDuties( + attestation=DutyAccumulator(assigned=10, included=10), + proposal=DutyAccumulator(assigned=10, included=10), + sync=DutyAccumulator(assigned=10, included=10), + ), + False, + 0.5, + 0.85, + ValidatorDutiesOutcome(participation_share=272, rebate_share=48, strikes=0), ), ( ValidatorDuties( @@ -924,6 +964,7 @@ def test_get_network_performance_raises_error_for_invalid_performance(): def test_process_validator_duty(validator_duties, is_slashed, threshold, reward_share, expected_outcome): validator = LidoValidatorFactory.build() validator.validator.slashed = is_slashed + validator.validator.effective_balance = MIN_ACTIVATION_BALANCE log_operator = Mock() log_operator.validators = defaultdict(ValidatorFrameSummary) @@ -1188,3 +1229,116 @@ def test_interval_mapping_raises_error_for_key_number_out_of_range(): reward_share = KeyNumberValueIntervalList([KeyNumberValueInterval(11, 10000)]) with pytest.raises(ValueError, match="No value found for key number=2"): reward_share.get_for(2) + + +@pytest.mark.parametrize("multiplier", [1, 2, 3, 64]) +@pytest.mark.unit +def test_get_validator_duties_outcome_scales_by_effective_balance(multiplier: int): + validator = LidoValidatorFactory.build() + validator.validator.slashed = False + validator.validator.effective_balance = MIN_ACTIVATION_BALANCE * multiplier + + duties = ValidatorDuties( + attestation=DutyAccumulator(assigned=10, included=10), + proposal=None, + sync=None, + ) + + threshold = 0.0 + reward_share = 0.5 + log_operator = Mock() + log_operator.validators = defaultdict(ValidatorFrameSummary) + + outcome = Distribution.get_validator_duties_outcome( + validator, + duties, + threshold, + reward_share, + PerformanceCoefficients(), + log_operator, + ) + + expected_assigned = 10 * MIN_ACTIVATION_BALANCE * multiplier // EFFECTIVE_BALANCE_INCREMENT + expected_participation = math.ceil(expected_assigned * reward_share) + expected_rebate = expected_assigned - expected_participation + + assert outcome == ValidatorDutiesOutcome( + participation_share=expected_participation, + rebate_share=expected_rebate, + strikes=0, + ) + + +@pytest.mark.unit +def test_calculate_distribution_in_frame_assigns_keys_by_sorted_order(): + w3 = Mock(spec=Web3, csm=Mock()) + reward_share_data = Mock() + reward_share_data.get_for = Mock(side_effect=lambda k: {1: 1.0, 2: 0.9, 3: 0.8, 4: 0.7, 5: 0.6, 6: 0.5}[k]) + w3.csm.get_curve_params = Mock( + return_value=CurveParams( + strikes_params=..., + perf_leeway_data=Mock(get_for=Mock(return_value=0.0)), + reward_share_data=reward_share_data, + perf_coeffs=PerformanceCoefficients(attestations_weight=1, blocks_weight=0, sync_weight=0), + ) + ) + + distribution = Distribution(w3, converter=..., state=State()) + distribution._get_network_performance = Mock(return_value=0.9) + + frame = (EpochNumber(0), EpochNumber(31)) + blockstamp = ReferenceBlockStampFactory.build(ref_epoch=31) + log = FramePerfLog(blockstamp, frame) + + # Three validators with different indices and balances; final order expected by index asc + v_idx5 = LidoValidatorFactory.build(index=ValidatorIndex(5)) + v_idx7 = LidoValidatorFactory.build(index=ValidatorIndex(7)) + v_idx8 = LidoValidatorFactory.build(index=ValidatorIndex(8)) + v_idx9 = LidoValidatorFactory.build(index=ValidatorIndex(9)) + v_idx10 = LidoValidatorFactory.build(index=ValidatorIndex(10)) + v_idx6 = LidoValidatorFactory.build(index=ValidatorIndex(6)) + v_idx5.validator.slashed = False + v_idx7.validator.slashed = False + v_idx8.validator.slashed = False + v_idx9.validator.slashed = False + v_idx10.validator.slashed = False + v_idx6.validator.slashed = False + + v_idx5.validator.effective_balance = MIN_ACTIVATION_BALANCE + v_idx7.validator.effective_balance = MIN_ACTIVATION_BALANCE * 3 + v_idx8.validator.effective_balance = MIN_ACTIVATION_BALANCE * 2 + v_idx10.validator.effective_balance = MIN_ACTIVATION_BALANCE * 2 + v_idx9.validator.effective_balance = MIN_ACTIVATION_BALANCE * 65 + v_idx6.validator.effective_balance = MIN_ACTIVATION_BALANCE * 64 + + # Same perfect duties for all + distribution.state.data = { + frame: NetworkDuties( + attestations=defaultdict( + DutyAccumulator, + { + v_idx5.index: DutyAccumulator(assigned=10, included=10), + v_idx7.index: DutyAccumulator(assigned=10, included=10), + v_idx8.index: DutyAccumulator(assigned=10, included=10), + v_idx9.index: DutyAccumulator(assigned=10, included=10), + v_idx10.index: DutyAccumulator(assigned=10, included=10), + v_idx6.index: DutyAccumulator(assigned=10, included=10), + }, + ), + proposals=defaultdict(DutyAccumulator), + syncs=defaultdict(DutyAccumulator), + ) + } + + operators_to_validators = { + (..., NodeOperatorId(1)): [v_idx10, v_idx7, v_idx5, v_idx8, v_idx9, v_idx6], + } + + distribution._calculate_distribution_in_frame(frame, blockstamp, Wei(300), operators_to_validators, log) + + assert log.operators[NodeOperatorId(1)].validators[ValidatorIndex(6)].rewards_share == 1.0 + assert log.operators[NodeOperatorId(1)].validators[ValidatorIndex(9)].rewards_share == 0.9 + assert log.operators[NodeOperatorId(1)].validators[ValidatorIndex(7)].rewards_share == 0.8 + assert log.operators[NodeOperatorId(1)].validators[ValidatorIndex(8)].rewards_share == 0.7 + assert log.operators[NodeOperatorId(1)].validators[ValidatorIndex(10)].rewards_share == 0.6 + assert log.operators[NodeOperatorId(1)].validators[ValidatorIndex(5)].rewards_share == 0.5 diff --git a/tests/modules/csm/test_csm_module.py b/tests/modules/csm/test_csm_module.py index 3308bafc5..6251a27f8 100644 --- a/tests/modules/csm/test_csm_module.py +++ b/tests/modules/csm/test_csm_module.py @@ -7,9 +7,10 @@ import pytest from hexbytes import HexBytes -from src.constants import UINT64_MAX +from src.constants import UINT64_MAX, CSM_LOGS_VERSION from src.modules.csm.csm import CSOracle, LastReport from src.modules.csm.distribution import Distribution +from src.modules.csm.log import Logs from src.modules.csm.state import State from src.modules.csm.tree import RewardsTree, StrikesTree from src.modules.csm.types import StrikesList @@ -402,7 +403,7 @@ class BuildReportTestParam: total_rewards_map=defaultdict(int), total_rebate=0, strikes=defaultdict(dict), - logs=[Mock()], + logs=Logs(frames=[Mock()]), ) ), curr_rewards_tree_root=HexBytes(ZERO_HASH), @@ -444,7 +445,7 @@ class BuildReportTestParam: ), total_rebate=1, strikes=defaultdict(dict), - logs=[Mock()], + logs=Logs(frames=[Mock()]), ) ), curr_rewards_tree_root=HexBytes("NEW_TREE_ROOT".encode()), @@ -494,7 +495,7 @@ class BuildReportTestParam: ), total_rebate=1, strikes=defaultdict(dict), - logs=[Mock()], + logs=Logs(frames=[Mock()]), ) ), curr_rewards_tree_root=HexBytes("NEW_TREE_ROOT".encode()), @@ -536,7 +537,7 @@ class BuildReportTestParam: total_rewards_map=defaultdict(int), total_rebate=0, strikes=defaultdict(dict), - logs=[Mock()], + logs=Logs(frames=[Mock()]), ) ), curr_rewards_tree_root=HexBytes(32), @@ -583,6 +584,7 @@ def test_build_report(module: CSOracle, param: BuildReportTestParam): assert module.make_rewards_tree.call_args == param.expected_make_rewards_tree_call_args assert report == param.expected_func_result + assert module.publish_log.call_args[0][0]._ver == CSM_LOGS_VERSION @pytest.mark.unit @@ -633,6 +635,37 @@ def test_execute_module_processed(module: CSOracle): assert execute_delay is ModuleExecuteDelay.NEXT_SLOT +@pytest.mark.unit +def test_calculate_distribution_lru_cache(module: CSOracle): + blockstamp = Mock() + last_report = Mock() + mock_distribution_result = Mock() + + with patch('src.modules.csm.csm.Distribution') as MockDistribution: + mock_distribution_instance = MockDistribution.return_value + mock_distribution_instance.calculate.return_value = mock_distribution_result + + module.converter = Mock() + module.state = Mock() + + result1 = module.calculate_distribution(blockstamp, last_report) + + result2 = module.calculate_distribution(blockstamp, last_report) + + assert result1 is result2 + assert result1 is mock_distribution_result + + assert MockDistribution.call_count == 1 + assert mock_distribution_instance.calculate.call_count == 1 + + module.calculate_distribution.cache_clear() + + result3 = module.calculate_distribution(blockstamp, last_report) + + assert MockDistribution.call_count == 2 + assert result3 is mock_distribution_result + + @dataclass(frozen=True) class RewardsTreeTestParam: shares: dict[NodeOperatorId, int] diff --git a/tests/modules/csm/test_log.py b/tests/modules/csm/test_log.py index c8fd64c76..cf02ec334 100644 --- a/tests/modules/csm/test_log.py +++ b/tests/modules/csm/test_log.py @@ -1,7 +1,8 @@ import json import pytest -from src.modules.csm.log import FramePerfLog, DutyAccumulator +from src.constants import CSM_LOGS_VERSION +from src.modules.csm.log import FramePerfLog, DutyAccumulator, Logs from src.providers.execution.contracts.cs_parameters_registry import PerformanceCoefficients from src.types import EpochNumber, NodeOperatorId, ReferenceBlockStamp from tests.factory.blockstamp import ReferenceBlockStampFactory @@ -54,12 +55,15 @@ def test_logs_encode(log: FramePerfLog): log_2.distributed_rewards = 0 log_2.rebate_to_protocol = 0 - logs = [log, log_2] + logs = Logs(frames=[log, log_2]) - encoded = FramePerfLog.encode(logs) + encoded = logs.encode() - decoded_logs = json.loads(encoded) + decoded_logs_data = json.loads(encoded) + assert decoded_logs_data['_ver'] == 1 + + decoded_logs = decoded_logs_data['frames'] for decoded in decoded_logs: assert decoded["operators"]["42"]["validators"]["41337"]["attestation_duty"]["assigned"] == 220 assert decoded["operators"]["42"]["validators"]["41337"]["attestation_duty"]["included"] == 119