diff --git a/meta/meta.json b/meta/meta.json index d61abf6c1..7c4772e75 100644 --- a/meta/meta.json +++ b/meta/meta.json @@ -1,3 +1,3 @@ { - "subnet_version": "4.2.11" + "subnet_version": "4.3.0" } diff --git a/neurons/validator.py b/neurons/validator.py index 74a9bbe81..8594519bb 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -96,7 +96,7 @@ def __init__(self): self.config = self.get_config() # Use the getattr function to safely get the autosync attribute with a default of False if not found. - self.auto_sync = getattr(self.config, 'autosync', False) and 'mothership' not in ValiUtils.get_secrets() + self.auto_sync = False self.is_mainnet = self.config.netuid == 8 # Ensure the directory for logging exists, else create one. if not os.path.exists(self.config.full_path): diff --git a/tests/vali_tests/test_p2p_syncer.py b/tests/vali_tests/test_p2p_syncer.py index 49ed4cd81..1b8ffd470 100644 --- a/tests/vali_tests/test_p2p_syncer.py +++ b/tests/vali_tests/test_p2p_syncer.py @@ -199,6 +199,10 @@ def test_checkpoint_syncing_order_not_in_majority(self): assert len(self.p2p_syncer.golden["positions"][self.DEFAULT_MINER_HOTKEY]["positions"][0]["orders"]) == 2 def test_checkpoint_syncing_order_not_in_majority_with_multiple_positions(self): + """ + position 1 has order 1 and 2 in the majority, order 0 heuristic matches with 1 so it is not an additional order + position 2, order 3 and 4 heuristic match together + """ order1 = deepcopy(self.default_order) order1.order_uuid = "test_order1" order2 = deepcopy(self.default_order) @@ -253,7 +257,7 @@ def test_checkpoint_syncing_order_not_in_majority_with_multiple_positions(self): assert len(self.p2p_syncer.golden["positions"][self.DEFAULT_MINER_HOTKEY]["positions"]) == 2 assert len(self.p2p_syncer.golden["positions"][self.DEFAULT_MINER_HOTKEY]["positions"][0]["orders"]) == 2 - assert len(self.p2p_syncer.golden["positions"][self.DEFAULT_MINER_HOTKEY]["positions"][1]["orders"]) == 2 + assert len(self.p2p_syncer.golden["positions"][self.DEFAULT_MINER_HOTKEY]["positions"][1]["orders"]) == 1 def test_checkpoint_syncing_position_not_in_majority(self): order1 = deepcopy(self.default_order) @@ -341,6 +345,9 @@ def test_checkpoint_syncing_multiple_positions(self): assert len(self.p2p_syncer.golden["positions"][self.DEFAULT_MINER_HOTKEY]["positions"][0]["orders"]) == 1 def test_checkpoint_syncing_multiple_miners(self): + """ + miners included as long as they appear in majority of checkpoints + """ order1 = deepcopy(self.default_order) order1.order_uuid = "test_order1" orders = [order1] @@ -349,19 +356,20 @@ def test_checkpoint_syncing_multiple_miners(self): position.orders = orders position.rebuild_position_with_updated_orders() - checkpoint1 = {"positions": {self.DEFAULT_MINER_HOTKEY: {"positions": [json.loads(position.to_json_string())]}}} - checkpoint2 = {"positions": {self.DEFAULT_MINER_HOTKEY: {"positions": [json.loads(position.to_json_string())]}}} - order0 = deepcopy(self.default_order) order0.order_uuid = "test_order0" orders = [order0] - position = deepcopy(self.default_position) - position.position_uuid = "test_position2" - position.orders = orders - position.rebuild_position_with_updated_orders() + position1 = deepcopy(self.default_position) + position1.position_uuid = "test_position2" + position1.orders = orders + position1.rebuild_position_with_updated_orders() - checkpoint3 = {"positions": {"diff_miner": {"positions": [json.loads(position.to_json_string())]}}} - checkpoint4 = {"positions": {"diff_miner": {"positions": [json.loads(position.to_json_string())]}}} + checkpoint1 = {"positions": {self.DEFAULT_MINER_HOTKEY: {"positions": [json.loads(position.to_json_string())]}}} + checkpoint2 = {"positions": {self.DEFAULT_MINER_HOTKEY: {"positions": [json.loads(position.to_json_string())]}, + "diff_miner": {"positions": [json.loads(position1.to_json_string())]}}} + + checkpoint3 = {"positions": {"diff_miner": {"positions": [json.loads(position1.to_json_string())]}}} + checkpoint4 = {"positions": {"diff_miner": {"positions": [json.loads(position1.to_json_string())]}, self.DEFAULT_MINER_HOTKEY: {"positions": [json.loads(position.to_json_string())]}}} checkpoints = {"test_validator1": [0, checkpoint1], "test_validator2": [0, checkpoint2], "test_validator3": [0, checkpoint3], "test_validator4": [0, checkpoint4]} @@ -380,6 +388,9 @@ def test_checkpoint_syncing_one_of_each_miner(self): pass def test_checkpoint_syncing_miner_not_in_majority(self): + """ + if the miner does not appear in the majority of checkpoints it will not be included + """ order1 = deepcopy(self.default_order) order1.order_uuid = "test_order1" orders = [order1] diff --git a/vali_objects/utils/p2p_syncer.py b/vali_objects/utils/p2p_syncer.py index 1a01d5980..80478c814 100644 --- a/vali_objects/utils/p2p_syncer.py +++ b/vali_objects/utils/p2p_syncer.py @@ -32,6 +32,7 @@ def __init__(self, wallet=None, metagraph=None, is_testnet=None, shutdown_dict=N self.created_golden = False self.last_signal_sync_time_ms = 0 self.running_unit_tests = running_unit_tests + self.min_checkpoints = 1 if running_unit_tests else ValiConfig.MIN_CHECKPOINTS_RECEIVED def send_checkpoint_requests(self): """ @@ -84,7 +85,7 @@ def send_checkpoint_requests(self): bt.logging.info(f"{n_successful_checkpoints} responses succeeded. {n_failures} responses failed") - if (n_successful_checkpoints > 0 and self.is_testnet) or n_successful_checkpoints >= ValiConfig.MIN_CHECKPOINTS_RECEIVED: + if (n_successful_checkpoints > 0 and self.is_testnet) or n_successful_checkpoints >= self.min_checkpoints: # sort all our successful responses by validator_trust sorted_v_trust = sorted(hotkey_to_received_checkpoint.items(), key=lambda item: item[1][0], reverse=True) hotkey_to_received_checkpoint = {checkpoint[0]: checkpoint[1] for checkpoint in sorted_v_trust} @@ -122,8 +123,8 @@ def create_golden(self, trusted_checkpoints: dict) -> bool: else: bt.logging.info(f"Checkpoint from validator {hotkey} is stale with newest order timestamp {latest_order_ms}, {round((TimeUtil.now_in_millis() - latest_order_ms)/(1000 * 60 * 60))} hrs ago, Skipping.") - if len(valid_checkpoints) == 0: - bt.logging.info(f"All {len(trusted_checkpoints)} checkpoints are stale, unable to build golden.") + if len(valid_checkpoints) < self.min_checkpoints: + bt.logging.info(f"Only {len(valid_checkpoints)} checkpoints are not stale, unable to build golden. Min required: {self.min_checkpoints}") return False else: bt.logging.info(f"Building golden from [{len(valid_checkpoints)}/{len(trusted_checkpoints)}] up-to-date checkpoints.") @@ -161,8 +162,8 @@ def p2p_sync_challengeperiod(self, valid_checkpoints: dict): self.parse_checkpoint_challengeperiod(checkpoint, challengeperiod_testing_data, challengeperiod_success_data) threshold = self.consensus_threshold(len(valid_checkpoints)) - majority_testing = {hotkey for hotkey, times in challengeperiod_testing_data.items() if len(times) >= threshold} - majority_success = {hotkey for hotkey, times in challengeperiod_success_data.items() if len(times) >= threshold} + majority_testing = {hotkey for hotkey, times in challengeperiod_testing_data.items() if len(times) > threshold} + majority_success = {hotkey for hotkey, times in challengeperiod_success_data.items() if len(times) > threshold} for hotkey in majority_testing: challengeperiod_testing[hotkey] = statistics.median_low(challengeperiod_testing_data[hotkey]) @@ -213,14 +214,14 @@ def p2p_sync_positions(self, valid_checkpoints: dict): # get the set of position_uuids that appear in the majority of checkpoints positions_threshold = self.consensus_threshold(len(valid_checkpoints)) - majority_positions = {position_uuid for position_uuid, count in position_counts.items() if count >= positions_threshold} + majority_positions = {position_uuid for position_uuid, count in position_counts.items() if count > positions_threshold} seen_positions = set() seen_orders = set() for validator_hotkey, checkpoint in valid_checkpoints.items(): positions = checkpoint.get("positions", {}) for miner_hotkey, miner_positions in positions.items(): - if miner_counts[miner_hotkey] < positions_threshold: + if miner_counts[miner_hotkey] <= positions_threshold: continue # combinations where the position_uuid appears in the majority @@ -262,7 +263,7 @@ def construct_positions_uuid_in_majority(self, miner_positions: dict, majority_p # get the set of order_uuids that appear in the majority of positions for a position_uuid orders_threshold = self.consensus_threshold(position_counts[position_uuid]) - majority_orders = {order_uuid for order_uuid, count in order_counts[position_uuid].items() if count >= orders_threshold} + majority_orders = {order_uuid for order_uuid, count in order_counts[position_uuid].items() if count > orders_threshold} for order_uuid in order_counts[position_uuid].keys(): if order_uuid not in seen_orders: @@ -277,7 +278,7 @@ def construct_positions_uuid_in_majority(self, miner_positions: dict, majority_p seen_orders.update([o["order_uuid"] for o in orders]) continue - if len(orders) > self.consensus_threshold(position_counts[position_uuid], heuristic_match=True): + if len(orders) > self.consensus_threshold(position_counts[position_uuid]): bt.logging.info(f"Order {order_uuid} with Position {position_uuid} on miner {position['miner_hotkey']} matched with {[o['order_uuid'] for o in orders]}, adding back in") else: bt.logging.info(f"Order {order_uuid} with Position {position_uuid} only matched [{len(orders)}/{position_counts[position_uuid]}] times on miner {position['miner_hotkey']} with with {[o['order_uuid'] for o in orders]}. Skipping") @@ -471,7 +472,7 @@ def heuristic_resolve_positions(self, positions_matrix: dict, num_checkpoints: i continue matches = self.find_matching_positions(position, trade_pairs[trade_pair], resolved_position_uuids, validator_hotkey) - if (len(matches) > self.consensus_threshold(num_checkpoints, heuristic_match=True) and + if (len(matches) > self.consensus_threshold(num_checkpoints) and set([match["position_uuid"] for match in matches]).isdisjoint(seen_positions)): # median number of orders for matched positions median_order_count = len(matches[len(matches) // 2]["orders"]) @@ -484,7 +485,7 @@ def heuristic_resolve_positions(self, positions_matrix: dict, num_checkpoints: i goal_order_count = max(median_order_count, max_common_order_count) matches_with_goal_order_count = [p for p in matches if len(p["orders"]) == goal_order_count] - bt.logging.info(f"Miner hotkey {miner_hotkey} has matches {[p['position_uuid'] for p in matches]}. goal_order_count: {goal_order_count}. matches_with_goal_order_count: {matches_with_goal_order_count}.") + bt.logging.info(f"Miner hotkey {miner_hotkey} has matches {[p['position_uuid'] for p in matches]}. goal_order_count: {goal_order_count}. matches_with_goal_order_count: {[p['position_uuid'] for p in matches_with_goal_order_count]}.") matched_positions.append(matches_with_goal_order_count[0]) else: bt.logging.info(f"Position {position['position_uuid']} only matched [{len(matches)}/{num_checkpoints}] times on miner {position['miner_hotkey']} with matches {[p['position_uuid'] for p in matches]}. Skipping") @@ -531,14 +532,11 @@ def find_matching_positions(self, position: dict, trade_pair_validator_positions matched_positions.sort(key=lambda x: (-len(x["orders"]), x["position_uuid"])) return matched_positions - def consensus_threshold(self, total_items: int, heuristic_match: bool=False) -> int: + def consensus_threshold(self, total_items: int) -> int: """ threshold for including a position or order in the golden """ - if heuristic_match: - return math.floor(total_items / 2) - else: - return math.ceil(total_items / 2) + return math.floor(total_items / 2) def get_median_order(self, orders: List[dict], trade_pair: TradePair) -> Order: """ @@ -588,19 +586,20 @@ def sync_positions_with_cooldown(self): if not (47 < datetime_now.minute < 57): return else: - # Check if we are between 7:09 AM and 7:19 AM UTC - # Temp change time to 21:00 UTC so we can see the effects in shadow mode ASAP + # Check if we are between 7:19 AM and 7:29 AM UTC if not (datetime_now.hour == 1 and (18 < datetime_now.minute < 30)): return try: - bt.logging.info("Calling send_checkpoint_requests") + bt.logging.info("Sending checkpoint requests") self.golden = None self.send_checkpoint_requests() if self.created_golden: - bt.logging.info("Calling apply_golden") - # TODO guard sync_positions with the signal lock once we move on from shadow mode - self.sync_positions(True, candidate_data=self.golden) + bt.logging.info("Golden created. Syncing positions.") + with self.signal_sync_lock: + while self.n_orders_being_processed[0] > 0: + self.signal_sync_condition.wait() + self.sync_positions(False, candidate_data=self.golden) except Exception as e: bt.logging.error(f"Error sending checkpoint: {e}") bt.logging.error(traceback.format_exc()) @@ -612,4 +611,4 @@ def sync_positions_with_cooldown(self): position_syncer = P2PSyncer(is_testnet=True) position_syncer.send_checkpoint_requests() if position_syncer.created_golden: - position_syncer.sync_positions(True, candidate_data=position_syncer.golden) + position_syncer.sync_positions(False, candidate_data=position_syncer.golden) diff --git a/vali_objects/utils/validator_sync_base.py b/vali_objects/utils/validator_sync_base.py index 2c53cd912..1f994c904 100644 --- a/vali_objects/utils/validator_sync_base.py +++ b/vali_objects/utils/validator_sync_base.py @@ -179,23 +179,26 @@ def write_modifications(self, position_to_sync_status, stats): # self.debug_print_pos(position) # print('---status', sync_status) + allow_writes = (not self.is_mothership or + (self.is_mothership and (stats['deleted'] == 0 or stats['deleted'] == stats['inserted']))) + bt.logging.info(f'allow_writes: {allow_writes} stats: {stats}') # Deletions happen first for position, sync_status in position_to_sync_status.items(): if sync_status == PositionSyncResult.DELETED: deleted -= 1 - if not self.is_mothership: + if allow_writes: self.position_manager.delete_position_from_disk(position) # Updates happen next # First close out contradicting positions that happen if a validator is left in a bad state - #for position, sync_status in position_to_sync_status.items(): - # if sync_status == PositionSyncResult.UPDATED or sync_status == PositionSyncResult.NOTHING: - # if not self.is_mothership: - # if position.is_closed_position: - # self.position_manager.delete_open_position_if_exists(position) + # for position, sync_status in position_to_sync_status.items(): + # if sync_status == PositionSyncResult.UPDATED or sync_status == PositionSyncResult.NOTHING: + # if allow_writes: + # if position.is_closed_position: + # self.position_manager.delete_open_position_if_exists(position) for position, sync_status in position_to_sync_status.items(): if sync_status == PositionSyncResult.UPDATED: - if not self.is_mothership: + if allow_writes: positions = self.split_position_on_flat(position) for p in positions: if p.is_open_position: @@ -206,7 +209,7 @@ def write_modifications(self, position_to_sync_status, stats): for position, sync_status in position_to_sync_status.items(): if sync_status == PositionSyncResult.INSERTED: inserted -= 1 - if not self.is_mothership: + if allow_writes: positions = self.split_position_on_flat(position) for p in positions: if p.is_open_position: