From 6570c8e0565abcea4ac1143b8c12ed667cd90d2d Mon Sep 17 00:00:00 2001 From: Jordan Bonilla Date: Sun, 28 Jul 2024 23:22:42 -0400 Subject: [PATCH 1/9] p2p sync golive --- neurons/validator.py | 2 +- vali_objects/utils/p2p_syncer.py | 12 +++---- vali_objects/utils/validator_sync_base.py | 42 +++++++---------------- 3 files changed, 20 insertions(+), 36 deletions(-) diff --git a/neurons/validator.py b/neurons/validator.py index 74a9bbe81..8594519bb 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -96,7 +96,7 @@ def __init__(self): self.config = self.get_config() # Use the getattr function to safely get the autosync attribute with a default of False if not found. - self.auto_sync = getattr(self.config, 'autosync', False) and 'mothership' not in ValiUtils.get_secrets() + self.auto_sync = False self.is_mainnet = self.config.netuid == 8 # Ensure the directory for logging exists, else create one. if not os.path.exists(self.config.full_path): diff --git a/vali_objects/utils/p2p_syncer.py b/vali_objects/utils/p2p_syncer.py index 1a01d5980..fa28cf674 100644 --- a/vali_objects/utils/p2p_syncer.py +++ b/vali_objects/utils/p2p_syncer.py @@ -588,9 +588,7 @@ def sync_positions_with_cooldown(self): if not (47 < datetime_now.minute < 57): return else: - # Check if we are between 7:09 AM and 7:19 AM UTC - # Temp change time to 21:00 UTC so we can see the effects in shadow mode ASAP - if not (datetime_now.hour == 1 and (18 < datetime_now.minute < 30)): + if not (datetime_now.hour == 21 and (18 < datetime_now.minute < 30)): return try: @@ -599,8 +597,10 @@ def sync_positions_with_cooldown(self): self.send_checkpoint_requests() if self.created_golden: bt.logging.info("Calling apply_golden") - # TODO guard sync_positions with the signal lock once we move on from shadow mode - self.sync_positions(True, candidate_data=self.golden) + with self.signal_sync_lock: + while self.n_orders_being_processed[0] > 0: + self.signal_sync_condition.wait() + self.sync_positions(False, candidate_data=self.golden) except Exception as e: bt.logging.error(f"Error sending checkpoint: {e}") bt.logging.error(traceback.format_exc()) @@ -612,4 +612,4 @@ def sync_positions_with_cooldown(self): position_syncer = P2PSyncer(is_testnet=True) position_syncer.send_checkpoint_requests() if position_syncer.created_golden: - position_syncer.sync_positions(True, candidate_data=position_syncer.golden) + position_syncer.sync_positions(False, candidate_data=position_syncer.golden) \ No newline at end of file diff --git a/vali_objects/utils/validator_sync_base.py b/vali_objects/utils/validator_sync_base.py index 2c53cd912..6301e32b6 100644 --- a/vali_objects/utils/validator_sync_base.py +++ b/vali_objects/utils/validator_sync_base.py @@ -168,50 +168,34 @@ def write_modifications(self, position_to_sync_status, stats): kept_and_matched = stats['kept'] + stats['matched'] deleted = stats['deleted'] inserted = stats['inserted'] - - # handle having multiple open positions for a hotkey - # close the older open position - prev_open_position = None - - #for position, sync_status in position_to_sync_status.items(): - # position_debug_sting = f'---debug printing pos to ss: {position.trade_pair.trade_pair_id} n_orders {len(position.orders)}' - # print(position_debug_sting) - # self.debug_print_pos(position) - # print('---status', sync_status) - + allow_writes = (not self.is_mothership or + (self.is_mothership and (stats['deleted'] == 0 or stats['deleted'] == stats['inserted']))) + bt.logging.info(f'allow_writes: {allow_writes} stats: {stats}') # Deletions happen first for position, sync_status in position_to_sync_status.items(): if sync_status == PositionSyncResult.DELETED: deleted -= 1 - if not self.is_mothership: + if allow_writes: self.position_manager.delete_position_from_disk(position) # Updates happen next # First close out contradicting positions that happen if a validator is left in a bad state - #for position, sync_status in position_to_sync_status.items(): - # if sync_status == PositionSyncResult.UPDATED or sync_status == PositionSyncResult.NOTHING: - # if not self.is_mothership: - # if position.is_closed_position: - # self.position_manager.delete_open_position_if_exists(position) + for position, sync_status in position_to_sync_status.items(): + if sync_status == PositionSyncResult.UPDATED or sync_status == PositionSyncResult.NOTHING: + if allow_writes: + if position.is_closed_position: + self.position_manager.delete_open_position_if_exists(position) for position, sync_status in position_to_sync_status.items(): if sync_status == PositionSyncResult.UPDATED: - if not self.is_mothership: - positions = self.split_position_on_flat(position) - for p in positions: - if p.is_open_position: - prev_open_position = self.close_older_open_position(p, prev_open_position) - self.position_manager.overwrite_position_on_disk(p) + if allow_writes: + self.position_manager.save_miner_position_to_disk(position, delete_open_position_if_exists=True) kept_and_matched -= 1 # Insertions happen last so that there is no double open position issue for position, sync_status in position_to_sync_status.items(): if sync_status == PositionSyncResult.INSERTED: inserted -= 1 - if not self.is_mothership: - positions = self.split_position_on_flat(position) - for p in positions: - if p.is_open_position: - prev_open_position = self.close_older_open_position(p, prev_open_position) - self.position_manager.overwrite_position_on_disk(p) + if allow_writes: + self.position_manager.save_miner_position_to_disk(position, delete_open_position_if_exists=False) for position, sync_status in position_to_sync_status.items(): if sync_status == PositionSyncResult.NOTHING: kept_and_matched -= 1 From 97aad8e5f43dff28c65fc15410473a521a4b5ac2 Mon Sep 17 00:00:00 2001 From: sli-tao Date: Tue, 17 Sep 2024 11:32:10 -0700 Subject: [PATCH 2/9] rebase --- vali_objects/utils/p2p_syncer.py | 3 ++- vali_objects/utils/validator_sync_base.py | 33 ++++++++++++++++++----- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/vali_objects/utils/p2p_syncer.py b/vali_objects/utils/p2p_syncer.py index fa28cf674..235b3f4b3 100644 --- a/vali_objects/utils/p2p_syncer.py +++ b/vali_objects/utils/p2p_syncer.py @@ -588,7 +588,8 @@ def sync_positions_with_cooldown(self): if not (47 < datetime_now.minute < 57): return else: - if not (datetime_now.hour == 21 and (18 < datetime_now.minute < 30)): + # Check if we are between 7:19 AM and 7:29 AM UTC + if not (datetime_now.hour == 1 and (18 < datetime_now.minute < 30)): return try: diff --git a/vali_objects/utils/validator_sync_base.py b/vali_objects/utils/validator_sync_base.py index 6301e32b6..1f994c904 100644 --- a/vali_objects/utils/validator_sync_base.py +++ b/vali_objects/utils/validator_sync_base.py @@ -168,6 +168,17 @@ def write_modifications(self, position_to_sync_status, stats): kept_and_matched = stats['kept'] + stats['matched'] deleted = stats['deleted'] inserted = stats['inserted'] + + # handle having multiple open positions for a hotkey + # close the older open position + prev_open_position = None + + #for position, sync_status in position_to_sync_status.items(): + # position_debug_sting = f'---debug printing pos to ss: {position.trade_pair.trade_pair_id} n_orders {len(position.orders)}' + # print(position_debug_sting) + # self.debug_print_pos(position) + # print('---status', sync_status) + allow_writes = (not self.is_mothership or (self.is_mothership and (stats['deleted'] == 0 or stats['deleted'] == stats['inserted']))) bt.logging.info(f'allow_writes: {allow_writes} stats: {stats}') @@ -180,22 +191,30 @@ def write_modifications(self, position_to_sync_status, stats): # Updates happen next # First close out contradicting positions that happen if a validator is left in a bad state - for position, sync_status in position_to_sync_status.items(): - if sync_status == PositionSyncResult.UPDATED or sync_status == PositionSyncResult.NOTHING: - if allow_writes: - if position.is_closed_position: - self.position_manager.delete_open_position_if_exists(position) + # for position, sync_status in position_to_sync_status.items(): + # if sync_status == PositionSyncResult.UPDATED or sync_status == PositionSyncResult.NOTHING: + # if allow_writes: + # if position.is_closed_position: + # self.position_manager.delete_open_position_if_exists(position) for position, sync_status in position_to_sync_status.items(): if sync_status == PositionSyncResult.UPDATED: if allow_writes: - self.position_manager.save_miner_position_to_disk(position, delete_open_position_if_exists=True) + positions = self.split_position_on_flat(position) + for p in positions: + if p.is_open_position: + prev_open_position = self.close_older_open_position(p, prev_open_position) + self.position_manager.overwrite_position_on_disk(p) kept_and_matched -= 1 # Insertions happen last so that there is no double open position issue for position, sync_status in position_to_sync_status.items(): if sync_status == PositionSyncResult.INSERTED: inserted -= 1 if allow_writes: - self.position_manager.save_miner_position_to_disk(position, delete_open_position_if_exists=False) + positions = self.split_position_on_flat(position) + for p in positions: + if p.is_open_position: + prev_open_position = self.close_older_open_position(p, prev_open_position) + self.position_manager.overwrite_position_on_disk(p) for position, sync_status in position_to_sync_status.items(): if sync_status == PositionSyncResult.NOTHING: kept_and_matched -= 1 From 5545dd7e8eeaac11ee1574071d25b08114c2a5b0 Mon Sep 17 00:00:00 2001 From: sli-tao Date: Tue, 17 Sep 2024 11:56:00 -0700 Subject: [PATCH 3/9] require min valid checkpoints to create golden, update logging --- meta/meta.json | 2 +- vali_objects/utils/p2p_syncer.py | 44 +++++++++++++++++--------------- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/meta/meta.json b/meta/meta.json index d61abf6c1..7c4772e75 100644 --- a/meta/meta.json +++ b/meta/meta.json @@ -1,3 +1,3 @@ { - "subnet_version": "4.2.11" + "subnet_version": "4.3.0" } diff --git a/vali_objects/utils/p2p_syncer.py b/vali_objects/utils/p2p_syncer.py index 235b3f4b3..a2592ef1e 100644 --- a/vali_objects/utils/p2p_syncer.py +++ b/vali_objects/utils/p2p_syncer.py @@ -92,11 +92,11 @@ def send_checkpoint_requests(self): bt.logging.info("Received enough checkpoints, now creating golden.") self.created_golden = self.create_golden(hotkey_to_received_checkpoint) else: - bt.logging.info("Not enough checkpoints received to create a golden.") + bt.logging.error("Not enough checkpoints received to create a golden.") self.created_golden = False except Exception as e: - bt.logging.info(f"Error generating golden with error [{e}]") + bt.logging.error(f"Error generating golden with error [{e}]") def create_golden(self, trusted_checkpoints: dict) -> bool: """ @@ -122,15 +122,15 @@ def create_golden(self, trusted_checkpoints: dict) -> bool: else: bt.logging.info(f"Checkpoint from validator {hotkey} is stale with newest order timestamp {latest_order_ms}, {round((TimeUtil.now_in_millis() - latest_order_ms)/(1000 * 60 * 60))} hrs ago, Skipping.") - if len(valid_checkpoints) == 0: - bt.logging.info(f"All {len(trusted_checkpoints)} checkpoints are stale, unable to build golden.") + if len(valid_checkpoints) < ValiConfig.MIN_CHECKPOINTS_RECEIVED: + bt.logging.error(f"Only {len(valid_checkpoints)} checkpoints are not stale, unable to build golden. Min required: {ValiConfig.MIN_CHECKPOINTS_RECEIVED}") return False else: bt.logging.info(f"Building golden from [{len(valid_checkpoints)}/{len(trusted_checkpoints)}] up-to-date checkpoints.") for hotkey, chk in valid_checkpoints.items(): - bt.logging.info(f"{hotkey} sent checkpoint {self.checkpoint_summary(chk)}") - bt.logging.info("--------------------------------------------------") + bt.logging.debug(f"{hotkey} sent checkpoint {self.checkpoint_summary(chk)}") + bt.logging.debug("--------------------------------------------------") golden_eliminations = position_manager.get_eliminations_from_disk() golden_positions = self.p2p_sync_positions(valid_checkpoints) @@ -229,13 +229,17 @@ def p2p_sync_positions(self, valid_checkpoints: dict): # combinations where the position_uuid does not appear in the majority, instead we use a heuristic match to combine positions for position in self.heuristic_resolve_positions(positions_matrix, len(valid_checkpoints), seen_positions): - bt.logging.info(f"Position {position['position_uuid']} on miner {position['miner_hotkey']} matched, adding back in") + bt.logging.debug(f"Position {position['position_uuid']} on miner {position['miner_hotkey']} matched, adding back in") miner_hotkey = position["miner_hotkey"] golden_positions[miner_hotkey]["positions"].append(position) - # convert defaultdict to dict - return {miner: dict(golden_positions[miner]) for miner in golden_positions} - + # Construct golden and convert defaultdict to dict + self.golden = {"created_timestamp_ms": TimeUtil.now_in_millis(), + "hard_snap_cutoff_ms": TimeUtil.now_in_millis() - 1000 * 60 * 15, + "eliminations": golden_eliminations, + "positions": {miner: dict(golden_positions[miner]) for miner in golden_positions}} + bt.logging.success(f"Created golden checkpoint: {self.checkpoint_summary(self.golden)}") + return True def construct_positions_uuid_in_majority(self, miner_positions: dict, majority_positions: Set[str], seen_positions: Set[str], seen_orders: Set[str], position_counts: dict, order_counts: dict, order_data: dict, orders_matrix: dict, validator_hotkey: str) -> List[dict]: """ @@ -278,9 +282,9 @@ def construct_positions_uuid_in_majority(self, miner_positions: dict, majority_p continue if len(orders) > self.consensus_threshold(position_counts[position_uuid], heuristic_match=True): - bt.logging.info(f"Order {order_uuid} with Position {position_uuid} on miner {position['miner_hotkey']} matched with {[o['order_uuid'] for o in orders]}, adding back in") + bt.logging.debug(f"Order {order_uuid} with Position {position_uuid} on miner {position['miner_hotkey']} matched with {[o['order_uuid'] for o in orders]}, adding back in") else: - bt.logging.info(f"Order {order_uuid} with Position {position_uuid} only matched [{len(orders)}/{position_counts[position_uuid]}] times on miner {position['miner_hotkey']} with with {[o['order_uuid'] for o in orders]}. Skipping") + bt.logging.debug(f"Order {order_uuid} with Position {position_uuid} only matched [{len(orders)}/{position_counts[position_uuid]}] times on miner {position['miner_hotkey']} with with {[o['order_uuid'] for o in orders]}. Skipping") continue trade_pair = TradePair.from_trade_pair_id(position["trade_pair"][0]) @@ -294,7 +298,7 @@ def construct_positions_uuid_in_majority(self, miner_positions: dict, majority_p position_dict = json.loads(new_position.to_json_string()) uuid_matched_positions.append(position_dict) except ValueError as v: - bt.logging.info(f"Miner [{new_position.miner_hotkey}] Position [{new_position.position_uuid}] Orders {[o.order_uuid for o in new_position.orders]} ValueError {v}") + bt.logging.debug(f"Miner [{new_position.miner_hotkey}] Position [{new_position.position_uuid}] Orders {[o.order_uuid for o in new_position.orders]} ValueError {v}") return uuid_matched_positions def find_matching_orders(self, order: dict, validator_to_orders: dict, resolved_orders: Set[str]) -> List[dict] | None: @@ -446,10 +450,10 @@ def find_legacy_miners(self, num_checkpoints: int, order_counts: dict, miner_to_ legacy_miners.add(miner_hotkey) elif newest_unique_order_timestamp == newest_order_timestamp: legacy_miner_candidates.add(miner_hotkey) - bt.logging.info( + bt.logging.debug( f"Miner {miner_hotkey} has [{(len(uuids['positions']) - num_repeated_pos)}/{len(uuids['positions'])} legacy positions, {(len(uuids['orders']) - num_repeated_orders)}/{len(uuids['orders'])} legacy orders]. Newest legacy order {newest_unique_order_uuid} at timestamp {newest_unique_order_timestamp}") else: - bt.logging.info(f"Miner {miner_hotkey} has 0 legacy positions or orders") + bt.logging.debug(f"Miner {miner_hotkey} has 0 legacy positions or orders") bt.logging.info(f"legacy_miners: {legacy_miners}") bt.logging.info(f"legacy_miner_candidates: {legacy_miner_candidates}") return legacy_miners @@ -484,10 +488,10 @@ def heuristic_resolve_positions(self, positions_matrix: dict, num_checkpoints: i goal_order_count = max(median_order_count, max_common_order_count) matches_with_goal_order_count = [p for p in matches if len(p["orders"]) == goal_order_count] - bt.logging.info(f"Miner hotkey {miner_hotkey} has matches {[p['position_uuid'] for p in matches]}. goal_order_count: {goal_order_count}. matches_with_goal_order_count: {matches_with_goal_order_count}.") + bt.logging.debug(f"Miner hotkey {miner_hotkey} has matches {[p['position_uuid'] for p in matches]}. goal_order_count: {goal_order_count}. matches_with_goal_order_count: {matches_with_goal_order_count}.") matched_positions.append(matches_with_goal_order_count[0]) else: - bt.logging.info(f"Position {position['position_uuid']} only matched [{len(matches)}/{num_checkpoints}] times on miner {position['miner_hotkey']} with matches {[p['position_uuid'] for p in matches]}. Skipping") + bt.logging.debug(f"Position {position['position_uuid']} only matched [{len(matches)}/{num_checkpoints}] times on miner {position['miner_hotkey']} with matches {[p['position_uuid'] for p in matches]}. Skipping") seen_positions.update([p["position_uuid"] for p in matches]) return matched_positions @@ -593,11 +597,11 @@ def sync_positions_with_cooldown(self): return try: - bt.logging.info("Calling send_checkpoint_requests") + bt.logging.info("Sending checkpoint requests") self.golden = None self.send_checkpoint_requests() if self.created_golden: - bt.logging.info("Calling apply_golden") + bt.logging.info("Golden created. Syncing positions.") with self.signal_sync_lock: while self.n_orders_being_processed[0] > 0: self.signal_sync_condition.wait() @@ -613,4 +617,4 @@ def sync_positions_with_cooldown(self): position_syncer = P2PSyncer(is_testnet=True) position_syncer.send_checkpoint_requests() if position_syncer.created_golden: - position_syncer.sync_positions(False, candidate_data=position_syncer.golden) \ No newline at end of file + position_syncer.sync_positions(False, candidate_data=position_syncer.golden) From 55f128ede00c4b1eb402b8022eacc502d9207cb0 Mon Sep 17 00:00:00 2001 From: sli-tao Date: Tue, 17 Sep 2024 12:08:33 -0700 Subject: [PATCH 4/9] fix min checkpoints for unit tests --- vali_objects/utils/p2p_syncer.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/vali_objects/utils/p2p_syncer.py b/vali_objects/utils/p2p_syncer.py index a2592ef1e..dbab3d604 100644 --- a/vali_objects/utils/p2p_syncer.py +++ b/vali_objects/utils/p2p_syncer.py @@ -31,7 +31,7 @@ def __init__(self, wallet=None, metagraph=None, is_testnet=None, shutdown_dict=N self.is_testnet = is_testnet self.created_golden = False self.last_signal_sync_time_ms = 0 - self.running_unit_tests = running_unit_tests + self.min_checkpoints = 1 if running_unit_tests else ValiConfig.MIN_CHECKPOINTS_RECEIVED def send_checkpoint_requests(self): """ @@ -84,7 +84,7 @@ def send_checkpoint_requests(self): bt.logging.info(f"{n_successful_checkpoints} responses succeeded. {n_failures} responses failed") - if (n_successful_checkpoints > 0 and self.is_testnet) or n_successful_checkpoints >= ValiConfig.MIN_CHECKPOINTS_RECEIVED: + if (n_successful_checkpoints > 0 and self.is_testnet) or n_successful_checkpoints >= self.min_checkpoints: # sort all our successful responses by validator_trust sorted_v_trust = sorted(hotkey_to_received_checkpoint.items(), key=lambda item: item[1][0], reverse=True) hotkey_to_received_checkpoint = {checkpoint[0]: checkpoint[1] for checkpoint in sorted_v_trust} @@ -122,8 +122,8 @@ def create_golden(self, trusted_checkpoints: dict) -> bool: else: bt.logging.info(f"Checkpoint from validator {hotkey} is stale with newest order timestamp {latest_order_ms}, {round((TimeUtil.now_in_millis() - latest_order_ms)/(1000 * 60 * 60))} hrs ago, Skipping.") - if len(valid_checkpoints) < ValiConfig.MIN_CHECKPOINTS_RECEIVED: - bt.logging.error(f"Only {len(valid_checkpoints)} checkpoints are not stale, unable to build golden. Min required: {ValiConfig.MIN_CHECKPOINTS_RECEIVED}") + if len(valid_checkpoints) < self.min_checkpoints: + bt.logging.error(f"Only {len(valid_checkpoints)} checkpoints are not stale, unable to build golden. Min required: {self.min_checkpoints}") return False else: bt.logging.info(f"Building golden from [{len(valid_checkpoints)}/{len(trusted_checkpoints)}] up-to-date checkpoints.") From 537d8cb96abbeb0af3cb77f35056a95734b24f37 Mon Sep 17 00:00:00 2001 From: sli-tao Date: Wed, 25 Sep 2024 15:53:57 -0700 Subject: [PATCH 5/9] undo logging type change --- vali_objects/utils/p2p_syncer.py | 37 +++++++++++++++----------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/vali_objects/utils/p2p_syncer.py b/vali_objects/utils/p2p_syncer.py index dbab3d604..fd2d4c20b 100644 --- a/vali_objects/utils/p2p_syncer.py +++ b/vali_objects/utils/p2p_syncer.py @@ -31,6 +31,7 @@ def __init__(self, wallet=None, metagraph=None, is_testnet=None, shutdown_dict=N self.is_testnet = is_testnet self.created_golden = False self.last_signal_sync_time_ms = 0 + self.running_unit_tests = running_unit_tests self.min_checkpoints = 1 if running_unit_tests else ValiConfig.MIN_CHECKPOINTS_RECEIVED def send_checkpoint_requests(self): @@ -92,11 +93,11 @@ def send_checkpoint_requests(self): bt.logging.info("Received enough checkpoints, now creating golden.") self.created_golden = self.create_golden(hotkey_to_received_checkpoint) else: - bt.logging.error("Not enough checkpoints received to create a golden.") + bt.logging.info("Not enough checkpoints received to create a golden.") self.created_golden = False except Exception as e: - bt.logging.error(f"Error generating golden with error [{e}]") + bt.logging.info(f"Error generating golden with error [{e}]") def create_golden(self, trusted_checkpoints: dict) -> bool: """ @@ -123,14 +124,14 @@ def create_golden(self, trusted_checkpoints: dict) -> bool: bt.logging.info(f"Checkpoint from validator {hotkey} is stale with newest order timestamp {latest_order_ms}, {round((TimeUtil.now_in_millis() - latest_order_ms)/(1000 * 60 * 60))} hrs ago, Skipping.") if len(valid_checkpoints) < self.min_checkpoints: - bt.logging.error(f"Only {len(valid_checkpoints)} checkpoints are not stale, unable to build golden. Min required: {self.min_checkpoints}") + bt.logging.info(f"Only {len(valid_checkpoints)} checkpoints are not stale, unable to build golden. Min required: {self.min_checkpoints}") return False else: bt.logging.info(f"Building golden from [{len(valid_checkpoints)}/{len(trusted_checkpoints)}] up-to-date checkpoints.") for hotkey, chk in valid_checkpoints.items(): - bt.logging.debug(f"{hotkey} sent checkpoint {self.checkpoint_summary(chk)}") - bt.logging.debug("--------------------------------------------------") + bt.logging.info(f"{hotkey} sent checkpoint {self.checkpoint_summary(chk)}") + bt.logging.info("--------------------------------------------------") golden_eliminations = position_manager.get_eliminations_from_disk() golden_positions = self.p2p_sync_positions(valid_checkpoints) @@ -229,17 +230,13 @@ def p2p_sync_positions(self, valid_checkpoints: dict): # combinations where the position_uuid does not appear in the majority, instead we use a heuristic match to combine positions for position in self.heuristic_resolve_positions(positions_matrix, len(valid_checkpoints), seen_positions): - bt.logging.debug(f"Position {position['position_uuid']} on miner {position['miner_hotkey']} matched, adding back in") + bt.logging.info(f"Position {position['position_uuid']} on miner {position['miner_hotkey']} matched, adding back in") miner_hotkey = position["miner_hotkey"] golden_positions[miner_hotkey]["positions"].append(position) - # Construct golden and convert defaultdict to dict - self.golden = {"created_timestamp_ms": TimeUtil.now_in_millis(), - "hard_snap_cutoff_ms": TimeUtil.now_in_millis() - 1000 * 60 * 15, - "eliminations": golden_eliminations, - "positions": {miner: dict(golden_positions[miner]) for miner in golden_positions}} - bt.logging.success(f"Created golden checkpoint: {self.checkpoint_summary(self.golden)}") - return True + # convert defaultdict to dict + return {miner: dict(golden_positions[miner]) for miner in golden_positions} + def construct_positions_uuid_in_majority(self, miner_positions: dict, majority_positions: Set[str], seen_positions: Set[str], seen_orders: Set[str], position_counts: dict, order_counts: dict, order_data: dict, orders_matrix: dict, validator_hotkey: str) -> List[dict]: """ @@ -282,9 +279,9 @@ def construct_positions_uuid_in_majority(self, miner_positions: dict, majority_p continue if len(orders) > self.consensus_threshold(position_counts[position_uuid], heuristic_match=True): - bt.logging.debug(f"Order {order_uuid} with Position {position_uuid} on miner {position['miner_hotkey']} matched with {[o['order_uuid'] for o in orders]}, adding back in") + bt.logging.info(f"Order {order_uuid} with Position {position_uuid} on miner {position['miner_hotkey']} matched with {[o['order_uuid'] for o in orders]}, adding back in") else: - bt.logging.debug(f"Order {order_uuid} with Position {position_uuid} only matched [{len(orders)}/{position_counts[position_uuid]}] times on miner {position['miner_hotkey']} with with {[o['order_uuid'] for o in orders]}. Skipping") + bt.logging.info(f"Order {order_uuid} with Position {position_uuid} only matched [{len(orders)}/{position_counts[position_uuid]}] times on miner {position['miner_hotkey']} with with {[o['order_uuid'] for o in orders]}. Skipping") continue trade_pair = TradePair.from_trade_pair_id(position["trade_pair"][0]) @@ -298,7 +295,7 @@ def construct_positions_uuid_in_majority(self, miner_positions: dict, majority_p position_dict = json.loads(new_position.to_json_string()) uuid_matched_positions.append(position_dict) except ValueError as v: - bt.logging.debug(f"Miner [{new_position.miner_hotkey}] Position [{new_position.position_uuid}] Orders {[o.order_uuid for o in new_position.orders]} ValueError {v}") + bt.logging.info(f"Miner [{new_position.miner_hotkey}] Position [{new_position.position_uuid}] Orders {[o.order_uuid for o in new_position.orders]} ValueError {v}") return uuid_matched_positions def find_matching_orders(self, order: dict, validator_to_orders: dict, resolved_orders: Set[str]) -> List[dict] | None: @@ -450,10 +447,10 @@ def find_legacy_miners(self, num_checkpoints: int, order_counts: dict, miner_to_ legacy_miners.add(miner_hotkey) elif newest_unique_order_timestamp == newest_order_timestamp: legacy_miner_candidates.add(miner_hotkey) - bt.logging.debug( + bt.logging.info( f"Miner {miner_hotkey} has [{(len(uuids['positions']) - num_repeated_pos)}/{len(uuids['positions'])} legacy positions, {(len(uuids['orders']) - num_repeated_orders)}/{len(uuids['orders'])} legacy orders]. Newest legacy order {newest_unique_order_uuid} at timestamp {newest_unique_order_timestamp}") else: - bt.logging.debug(f"Miner {miner_hotkey} has 0 legacy positions or orders") + bt.logging.info(f"Miner {miner_hotkey} has 0 legacy positions or orders") bt.logging.info(f"legacy_miners: {legacy_miners}") bt.logging.info(f"legacy_miner_candidates: {legacy_miner_candidates}") return legacy_miners @@ -488,10 +485,10 @@ def heuristic_resolve_positions(self, positions_matrix: dict, num_checkpoints: i goal_order_count = max(median_order_count, max_common_order_count) matches_with_goal_order_count = [p for p in matches if len(p["orders"]) == goal_order_count] - bt.logging.debug(f"Miner hotkey {miner_hotkey} has matches {[p['position_uuid'] for p in matches]}. goal_order_count: {goal_order_count}. matches_with_goal_order_count: {matches_with_goal_order_count}.") + bt.logging.info(f"Miner hotkey {miner_hotkey} has matches {[p['position_uuid'] for p in matches]}. goal_order_count: {goal_order_count}. matches_with_goal_order_count: {matches_with_goal_order_count}.") matched_positions.append(matches_with_goal_order_count[0]) else: - bt.logging.debug(f"Position {position['position_uuid']} only matched [{len(matches)}/{num_checkpoints}] times on miner {position['miner_hotkey']} with matches {[p['position_uuid'] for p in matches]}. Skipping") + bt.logging.info(f"Position {position['position_uuid']} only matched [{len(matches)}/{num_checkpoints}] times on miner {position['miner_hotkey']} with matches {[p['position_uuid'] for p in matches]}. Skipping") seen_positions.update([p["position_uuid"] for p in matches]) return matched_positions From 2ca9548cd2bdbab346c34d9c338d7a996dda0548 Mon Sep 17 00:00:00 2001 From: sli-tao Date: Wed, 25 Sep 2024 15:54:43 -0700 Subject: [PATCH 6/9] raise threshold for uuid match --- vali_objects/utils/p2p_syncer.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/vali_objects/utils/p2p_syncer.py b/vali_objects/utils/p2p_syncer.py index fd2d4c20b..f3c21fea7 100644 --- a/vali_objects/utils/p2p_syncer.py +++ b/vali_objects/utils/p2p_syncer.py @@ -162,8 +162,8 @@ def p2p_sync_challengeperiod(self, valid_checkpoints: dict): self.parse_checkpoint_challengeperiod(checkpoint, challengeperiod_testing_data, challengeperiod_success_data) threshold = self.consensus_threshold(len(valid_checkpoints)) - majority_testing = {hotkey for hotkey, times in challengeperiod_testing_data.items() if len(times) >= threshold} - majority_success = {hotkey for hotkey, times in challengeperiod_success_data.items() if len(times) >= threshold} + majority_testing = {hotkey for hotkey, times in challengeperiod_testing_data.items() if len(times) > threshold} + majority_success = {hotkey for hotkey, times in challengeperiod_success_data.items() if len(times) > threshold} for hotkey in majority_testing: challengeperiod_testing[hotkey] = statistics.median_low(challengeperiod_testing_data[hotkey]) @@ -214,7 +214,7 @@ def p2p_sync_positions(self, valid_checkpoints: dict): # get the set of position_uuids that appear in the majority of checkpoints positions_threshold = self.consensus_threshold(len(valid_checkpoints)) - majority_positions = {position_uuid for position_uuid, count in position_counts.items() if count >= positions_threshold} + majority_positions = {position_uuid for position_uuid, count in position_counts.items() if count > positions_threshold} seen_positions = set() seen_orders = set() @@ -263,7 +263,7 @@ def construct_positions_uuid_in_majority(self, miner_positions: dict, majority_p # get the set of order_uuids that appear in the majority of positions for a position_uuid orders_threshold = self.consensus_threshold(position_counts[position_uuid]) - majority_orders = {order_uuid for order_uuid, count in order_counts[position_uuid].items() if count >= orders_threshold} + majority_orders = {order_uuid for order_uuid, count in order_counts[position_uuid].items() if count > orders_threshold} for order_uuid in order_counts[position_uuid].keys(): if order_uuid not in seen_orders: @@ -278,7 +278,7 @@ def construct_positions_uuid_in_majority(self, miner_positions: dict, majority_p seen_orders.update([o["order_uuid"] for o in orders]) continue - if len(orders) > self.consensus_threshold(position_counts[position_uuid], heuristic_match=True): + if len(orders) > self.consensus_threshold(position_counts[position_uuid]): bt.logging.info(f"Order {order_uuid} with Position {position_uuid} on miner {position['miner_hotkey']} matched with {[o['order_uuid'] for o in orders]}, adding back in") else: bt.logging.info(f"Order {order_uuid} with Position {position_uuid} only matched [{len(orders)}/{position_counts[position_uuid]}] times on miner {position['miner_hotkey']} with with {[o['order_uuid'] for o in orders]}. Skipping") @@ -472,7 +472,7 @@ def heuristic_resolve_positions(self, positions_matrix: dict, num_checkpoints: i continue matches = self.find_matching_positions(position, trade_pairs[trade_pair], resolved_position_uuids, validator_hotkey) - if (len(matches) > self.consensus_threshold(num_checkpoints, heuristic_match=True) and + if (len(matches) > self.consensus_threshold(num_checkpoints) and set([match["position_uuid"] for match in matches]).isdisjoint(seen_positions)): # median number of orders for matched positions median_order_count = len(matches[len(matches) // 2]["orders"]) @@ -485,7 +485,7 @@ def heuristic_resolve_positions(self, positions_matrix: dict, num_checkpoints: i goal_order_count = max(median_order_count, max_common_order_count) matches_with_goal_order_count = [p for p in matches if len(p["orders"]) == goal_order_count] - bt.logging.info(f"Miner hotkey {miner_hotkey} has matches {[p['position_uuid'] for p in matches]}. goal_order_count: {goal_order_count}. matches_with_goal_order_count: {matches_with_goal_order_count}.") + bt.logging.info(f"Miner hotkey {miner_hotkey} has matches {[p['position_uuid'] for p in matches]}. goal_order_count: {goal_order_count}. matches_with_goal_order_count: {[p['position_uuid'] for p in matches_with_goal_order_count]}.") matched_positions.append(matches_with_goal_order_count[0]) else: bt.logging.info(f"Position {position['position_uuid']} only matched [{len(matches)}/{num_checkpoints}] times on miner {position['miner_hotkey']} with matches {[p['position_uuid'] for p in matches]}. Skipping") @@ -532,14 +532,11 @@ def find_matching_positions(self, position: dict, trade_pair_validator_positions matched_positions.sort(key=lambda x: (-len(x["orders"]), x["position_uuid"])) return matched_positions - def consensus_threshold(self, total_items: int, heuristic_match: bool=False) -> int: + def consensus_threshold(self, total_items: int) -> int: """ threshold for including a position or order in the golden """ - if heuristic_match: - return math.floor(total_items / 2) - else: - return math.ceil(total_items / 2) + return math.floor(total_items / 2) def get_median_order(self, orders: List[dict], trade_pair: TradePair) -> Order: """ From 0fb161db9c2980169cddf5d613c90f91753309e8 Mon Sep 17 00:00:00 2001 From: sli-tao Date: Wed, 25 Sep 2024 17:03:48 -0700 Subject: [PATCH 7/9] update tests for threshold --- tests/vali_tests/test_p2p_syncer.py | 31 +++++++++++++++++++---------- vali_objects/utils/p2p_syncer.py | 2 +- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/tests/vali_tests/test_p2p_syncer.py b/tests/vali_tests/test_p2p_syncer.py index 49ed4cd81..1b8ffd470 100644 --- a/tests/vali_tests/test_p2p_syncer.py +++ b/tests/vali_tests/test_p2p_syncer.py @@ -199,6 +199,10 @@ def test_checkpoint_syncing_order_not_in_majority(self): assert len(self.p2p_syncer.golden["positions"][self.DEFAULT_MINER_HOTKEY]["positions"][0]["orders"]) == 2 def test_checkpoint_syncing_order_not_in_majority_with_multiple_positions(self): + """ + position 1 has order 1 and 2 in the majority, order 0 heuristic matches with 1 so it is not an additional order + position 2, order 3 and 4 heuristic match together + """ order1 = deepcopy(self.default_order) order1.order_uuid = "test_order1" order2 = deepcopy(self.default_order) @@ -253,7 +257,7 @@ def test_checkpoint_syncing_order_not_in_majority_with_multiple_positions(self): assert len(self.p2p_syncer.golden["positions"][self.DEFAULT_MINER_HOTKEY]["positions"]) == 2 assert len(self.p2p_syncer.golden["positions"][self.DEFAULT_MINER_HOTKEY]["positions"][0]["orders"]) == 2 - assert len(self.p2p_syncer.golden["positions"][self.DEFAULT_MINER_HOTKEY]["positions"][1]["orders"]) == 2 + assert len(self.p2p_syncer.golden["positions"][self.DEFAULT_MINER_HOTKEY]["positions"][1]["orders"]) == 1 def test_checkpoint_syncing_position_not_in_majority(self): order1 = deepcopy(self.default_order) @@ -341,6 +345,9 @@ def test_checkpoint_syncing_multiple_positions(self): assert len(self.p2p_syncer.golden["positions"][self.DEFAULT_MINER_HOTKEY]["positions"][0]["orders"]) == 1 def test_checkpoint_syncing_multiple_miners(self): + """ + miners included as long as they appear in majority of checkpoints + """ order1 = deepcopy(self.default_order) order1.order_uuid = "test_order1" orders = [order1] @@ -349,19 +356,20 @@ def test_checkpoint_syncing_multiple_miners(self): position.orders = orders position.rebuild_position_with_updated_orders() - checkpoint1 = {"positions": {self.DEFAULT_MINER_HOTKEY: {"positions": [json.loads(position.to_json_string())]}}} - checkpoint2 = {"positions": {self.DEFAULT_MINER_HOTKEY: {"positions": [json.loads(position.to_json_string())]}}} - order0 = deepcopy(self.default_order) order0.order_uuid = "test_order0" orders = [order0] - position = deepcopy(self.default_position) - position.position_uuid = "test_position2" - position.orders = orders - position.rebuild_position_with_updated_orders() + position1 = deepcopy(self.default_position) + position1.position_uuid = "test_position2" + position1.orders = orders + position1.rebuild_position_with_updated_orders() - checkpoint3 = {"positions": {"diff_miner": {"positions": [json.loads(position.to_json_string())]}}} - checkpoint4 = {"positions": {"diff_miner": {"positions": [json.loads(position.to_json_string())]}}} + checkpoint1 = {"positions": {self.DEFAULT_MINER_HOTKEY: {"positions": [json.loads(position.to_json_string())]}}} + checkpoint2 = {"positions": {self.DEFAULT_MINER_HOTKEY: {"positions": [json.loads(position.to_json_string())]}, + "diff_miner": {"positions": [json.loads(position1.to_json_string())]}}} + + checkpoint3 = {"positions": {"diff_miner": {"positions": [json.loads(position1.to_json_string())]}}} + checkpoint4 = {"positions": {"diff_miner": {"positions": [json.loads(position1.to_json_string())]}, self.DEFAULT_MINER_HOTKEY: {"positions": [json.loads(position.to_json_string())]}}} checkpoints = {"test_validator1": [0, checkpoint1], "test_validator2": [0, checkpoint2], "test_validator3": [0, checkpoint3], "test_validator4": [0, checkpoint4]} @@ -380,6 +388,9 @@ def test_checkpoint_syncing_one_of_each_miner(self): pass def test_checkpoint_syncing_miner_not_in_majority(self): + """ + if the miner does not appear in the majority of checkpoints it will not be included + """ order1 = deepcopy(self.default_order) order1.order_uuid = "test_order1" orders = [order1] diff --git a/vali_objects/utils/p2p_syncer.py b/vali_objects/utils/p2p_syncer.py index f3c21fea7..80478c814 100644 --- a/vali_objects/utils/p2p_syncer.py +++ b/vali_objects/utils/p2p_syncer.py @@ -221,7 +221,7 @@ def p2p_sync_positions(self, valid_checkpoints: dict): for validator_hotkey, checkpoint in valid_checkpoints.items(): positions = checkpoint.get("positions", {}) for miner_hotkey, miner_positions in positions.items(): - if miner_counts[miner_hotkey] < positions_threshold: + if miner_counts[miner_hotkey] <= positions_threshold: continue # combinations where the position_uuid appears in the majority From 45d5f0bc5c20593d8afb71923f35ed603e6f8518 Mon Sep 17 00:00:00 2001 From: sli-tao Date: Mon, 30 Sep 2024 15:36:39 -0700 Subject: [PATCH 8/9] attempt heuristic match for every position --- tests/vali_tests/test_p2p_syncer.py | 46 +----- vali_objects/utils/p2p_syncer.py | 184 ++++++++++------------ vali_objects/utils/validator_sync_base.py | 2 + 3 files changed, 85 insertions(+), 147 deletions(-) diff --git a/tests/vali_tests/test_p2p_syncer.py b/tests/vali_tests/test_p2p_syncer.py index 1b8ffd470..0b2486783 100644 --- a/tests/vali_tests/test_p2p_syncer.py +++ b/tests/vali_tests/test_p2p_syncer.py @@ -361,6 +361,7 @@ def test_checkpoint_syncing_multiple_miners(self): orders = [order0] position1 = deepcopy(self.default_position) position1.position_uuid = "test_position2" + position1.miner_hotkey = "diff_miner" position1.orders = orders position1.rebuild_position_with_updated_orders() @@ -431,51 +432,6 @@ def test_checkpoint_syncing_miner_not_in_majority(self): assert len(self.p2p_syncer.golden["positions"][self.DEFAULT_MINER_HOTKEY]["positions"]) == 1 assert len(self.p2p_syncer.golden["positions"][self.DEFAULT_MINER_HOTKEY]["positions"][0]["orders"]) == 1 - def test_heuristic_resolve_positions(self): - order1 = deepcopy(self.default_order) - order1.order_uuid = "test_order1" - order1.processed_ms = 100 - orders = [order1] - position1 = deepcopy(self.default_position) - position1.position_uuid = "test_position1" - position1.orders = orders - position1.rebuild_position_with_updated_orders() - - order2 = deepcopy(self.default_order) - order2.order_uuid = "test_order2" - order2.processed_ms = 110 - orders = [order2] - position2 = deepcopy(self.default_position) - position2.position_uuid = "test_position2" - position2.orders = orders - position2.rebuild_position_with_updated_orders() - - order3 = deepcopy(self.default_order) - order3.order_uuid = "test_order3" - order3.processed_ms = 90 - orders = [order3] - position3 = deepcopy(self.default_position) - position3.position_uuid = "test_position3" - position3.orders = orders - position3.rebuild_position_with_updated_orders() - - order4 = deepcopy(self.default_order) - order4.order_uuid = "test_order4" - order4.processed_ms = TimeUtil.now_in_millis() - 1000 * 60 * 10 - orders = [order4] - position4 = deepcopy(self.default_position) - position4.position_uuid = "test_position4" - position4.orders = orders - position4.rebuild_position_with_updated_orders() - - matrix = {'miner_hotkey_1': {self.DEFAULT_TRADE_PAIR: {'validator_hotkey_1': [json.loads(position1.to_json_string())], 'validator_hotkey_2': [json.loads(position2.to_json_string())]}}, - 'miner_hotkey_2': {self.DEFAULT_TRADE_PAIR: {'validator_hotkey_3': [json.loads(position3.to_json_string())], 'validator_hotkey_4': [json.loads(position4.to_json_string())]}}} - - matched_positions = self.p2p_syncer.heuristic_resolve_positions(matrix, 2, set()) - - assert len(matched_positions) == 1 - assert matched_positions[0]["position_uuid"] == "test_position1" - def test_checkpoint_last_order_time(self): order1 = deepcopy(self.default_order) order1.order_uuid = "test_order1" diff --git a/vali_objects/utils/p2p_syncer.py b/vali_objects/utils/p2p_syncer.py index 80478c814..84c136d1d 100644 --- a/vali_objects/utils/p2p_syncer.py +++ b/vali_objects/utils/p2p_syncer.py @@ -204,101 +204,106 @@ def p2p_sync_positions(self, valid_checkpoints: dict): positions_matrix = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) # {miner hotkey: {trade pair: {validator hotkey: [all positions on validator]}}} orders_matrix = defaultdict(lambda: defaultdict(list)) # {position_uuid: {validator hotkey: [all orders on validator]}} + position_validators = defaultdict(set) # {position_uuid: {validator hotkeys}} + order_validators = defaultdict(set) # {order_uuid: {validator hotkeys}} + # parse each checkpoint to count occurrences of each position and order for hotkey, checkpoint in valid_checkpoints.items(): - self.parse_checkpoint_positions(hotkey, checkpoint, position_counts, order_counts, order_data, miner_to_uuids, miner_counts, positions_matrix, orders_matrix) + self.parse_checkpoint_positions(hotkey, checkpoint, position_counts, order_counts, order_data, miner_to_uuids, miner_counts, positions_matrix, orders_matrix, position_validators, order_validators) self.prune_position_orders(order_counts, orders_matrix) # miners who are still running legacy code. do not want to include them in checkpoint self.find_legacy_miners(len(valid_checkpoints), order_counts, miner_to_uuids, position_counts, order_data) # get the set of position_uuids that appear in the majority of checkpoints - positions_threshold = self.consensus_threshold(len(valid_checkpoints)) - majority_positions = {position_uuid for position_uuid, count in position_counts.items() if count > positions_threshold} + threshold = self.consensus_threshold(len(valid_checkpoints)) seen_positions = set() seen_orders = set() for validator_hotkey, checkpoint in valid_checkpoints.items(): positions = checkpoint.get("positions", {}) for miner_hotkey, miner_positions in positions.items(): - if miner_counts[miner_hotkey] <= positions_threshold: + if miner_counts[miner_hotkey] <= threshold: continue + matched_positions = self.construct_common_positions(miner_positions, seen_positions, seen_orders, order_data, orders_matrix, positions_matrix, validator_hotkey, threshold, position_validators, order_validators) + golden_positions[miner_hotkey]["positions"].extend(matched_positions) - # combinations where the position_uuid appears in the majority - uuid_matched_positions = self.construct_positions_uuid_in_majority(miner_positions, majority_positions, seen_positions, seen_orders, position_counts, order_counts, order_data, orders_matrix, validator_hotkey) - golden_positions[miner_hotkey]["positions"].extend(uuid_matched_positions) - - # combinations where the position_uuid does not appear in the majority, instead we use a heuristic match to combine positions - for position in self.heuristic_resolve_positions(positions_matrix, len(valid_checkpoints), seen_positions): - bt.logging.info(f"Position {position['position_uuid']} on miner {position['miner_hotkey']} matched, adding back in") - miner_hotkey = position["miner_hotkey"] - golden_positions[miner_hotkey]["positions"].append(position) - - # convert defaultdict to dict + # convert to dict return {miner: dict(golden_positions[miner]) for miner in golden_positions} - - def construct_positions_uuid_in_majority(self, miner_positions: dict, majority_positions: Set[str], seen_positions: Set[str], seen_orders: Set[str], position_counts: dict, order_counts: dict, order_data: dict, orders_matrix: dict, validator_hotkey: str) -> List[dict]: + def construct_common_positions(self, miner_positions: dict, seen_positions: Set[str], seen_orders: Set[str], order_data: dict, orders_matrix: dict, positions_matrix: dict, validator_hotkey: str, threshold: int, position_validators: dict, order_validators: dict) -> List[dict]: """ - return the positions to add to golden, when the position_uuid appears in the majority of checkpoints. + return the positions to add to golden, by attempting to match each position up with others. construct each position from its orders. if the order appears in the majority then it is taken, otherwise the order is attempted to be matched to other orders using a heuristic. - position_counts = defaultdict(int) # {position_uuid: count} - order_counts = defaultdict(lambda: defaultdict(int)) # {position_uuid: {order_uuid: count}} order_data = defaultdict(list) # {order_uuid: [{order}]} + positions_matrix = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) # {miner hotkey: {trade pair: {validator hotkey: [all positions on validator]}}} orders_matrix = defaultdict(lambda: defaultdict(list)) # {position_uuid: {validator hotkey: [all orders on validator]}} """ uuid_matched_positions = [] resolved_orders = set() # separate from seen_orders, because we want to be able to match with seen orders + resolved_positions = set() for position in miner_positions["positions"]: position_uuid = position["position_uuid"] - # position exists on majority of validators - if position_uuid in majority_positions and position_uuid not in seen_positions: - seen_positions.add(position_uuid) - new_position = Position(**position) - new_position.orders = [] - - # get the set of order_uuids that appear in the majority of positions for a position_uuid - orders_threshold = self.consensus_threshold(position_counts[position_uuid]) - majority_orders = {order_uuid for order_uuid, count in order_counts[position_uuid].items() if count > orders_threshold} - - for order_uuid in order_counts[position_uuid].keys(): - if order_uuid not in seen_orders: - # combinations where the order_uuid appears in the majority - if order_uuid in majority_orders: - orders = order_data[order_uuid] - # combinations where the order_uuid does not appear in the majority, instead we use a heuristic to combine orders - else: - orders = self.find_matching_orders(order_data[order_uuid][0], orders_matrix[position_uuid], resolved_orders) - # order has matched with another order that has already been inserted - if not set([o["order_uuid"] for o in orders]).isdisjoint(seen_orders): - seen_orders.update([o["order_uuid"] for o in orders]) - continue - - if len(orders) > self.consensus_threshold(position_counts[position_uuid]): - bt.logging.info(f"Order {order_uuid} with Position {position_uuid} on miner {position['miner_hotkey']} matched with {[o['order_uuid'] for o in orders]}, adding back in") - else: - bt.logging.info(f"Order {order_uuid} with Position {position_uuid} only matched [{len(orders)}/{position_counts[position_uuid]}] times on miner {position['miner_hotkey']} with with {[o['order_uuid'] for o in orders]}. Skipping") - continue - - trade_pair = TradePair.from_trade_pair_id(position["trade_pair"][0]) - median_order = self.get_median_order(orders, trade_pair) - new_position.orders.append(median_order) - seen_orders.update([o["order_uuid"] for o in orders]) - - new_position.orders.sort(key=lambda o: o.processed_ms) - try: - new_position.rebuild_position_with_updated_orders() - position_dict = json.loads(new_position.to_json_string()) - uuid_matched_positions.append(position_dict) - except ValueError as v: - bt.logging.info(f"Miner [{new_position.miner_hotkey}] Position [{new_position.position_uuid}] Orders {[o.order_uuid for o in new_position.orders]} ValueError {v}") + miner_hotkey = position["miner_hotkey"] + trade_pair_id = position["trade_pair"][0] + + if position_uuid not in seen_positions: + # find all matches for a position + matched_positions = self.find_matching_positions(position, positions_matrix[miner_hotkey][trade_pair_id], resolved_positions, validator_hotkey, position_validators) + matched_position_uuids = [p["position_uuid"] for p in matched_positions] + matched_position_uuids_set = set(matched_position_uuids) + # see if some positions are heuristically matched + if len(matched_position_uuids_set) != 1: + bt.logging.info(f"Positions matched for miner {miner_hotkey}: {matched_position_uuids}") + + # ensure that we have not previously encountered/added this position or its matches + if len(matched_positions) > threshold and matched_position_uuids_set.isdisjoint(seen_positions): + new_position = Position(**position) + new_position.orders = [] + + # get all the orders that appear in these positions + orders_in_matched_positions = set() + # all the validators and orders for the positions + matched_positions_orders_matrix = defaultdict(list) + for p in matched_positions: + orders_in_matched_positions.update([o["order_uuid"] for o in p["orders"]]) + matched_positions_orders_matrix.update(orders_matrix[p["position_uuid"]]) + + # find all the common orders + for order_uuid in orders_in_matched_positions: + if order_uuid not in seen_orders: + matched_orders = self.find_matching_orders(order_data[order_uuid][0], matched_positions_orders_matrix, resolved_orders, order_validators) + matched_order_uuids = [o["order_uuid"] for o in matched_orders] + matched_order_uuids_set = set(matched_order_uuids) + # see if some orders are heuristically matched + if len(matched_order_uuids_set) != 1: + bt.logging.info(f"Orders matched for miner {miner_hotkey}: {matched_order_uuids}, across positions: {matched_position_uuids}") + + # ensure that we have not previously encountered/added this order or its matches + if len(matched_orders) > threshold and matched_order_uuids_set.isdisjoint(seen_orders): + # add an order to position + trade_pair = TradePair.from_trade_pair_id(trade_pair_id) + median_order = self.get_median_order(matched_orders, trade_pair) + new_position.orders.append(median_order) + # add to seen orders + seen_orders.update(matched_order_uuids_set) + # sort orders by processed time + new_position.orders.sort(key=lambda o: o.processed_ms) + try: + new_position.rebuild_position_with_updated_orders() + position_dict = json.loads(new_position.to_json_string()) + uuid_matched_positions.append(position_dict) + except ValueError as v: + bt.logging.info(f"Miner [{new_position.miner_hotkey}] Position [{new_position.position_uuid}] Orders {[o.order_uuid for o in new_position.orders]} ValueError {v}") + # add to seen positions + seen_positions.update(matched_position_uuids_set) return uuid_matched_positions - def find_matching_orders(self, order: dict, validator_to_orders: dict, resolved_orders: Set[str]) -> List[dict] | None: + def find_matching_orders(self, order: dict, validator_to_orders: dict, resolved_orders: Set[str], order_validators: dict) -> List[dict] | None: """ compare an order to all other orders associated with a position, and find all the matches using a heuristic. sort matches by order_uuid. @@ -314,6 +319,10 @@ def find_matching_orders(self, order: dict, validator_to_orders: dict, resolved_ if o["order_uuid"] in resolved_orders: continue + # if 2 orders order and o appear under the same checkpoint, they must be distinct and cannot match. + if o["order_uuid"] != order["order_uuid"] and not order_validators[order["order_uuid"]].isdisjoint(order_validators[o["order_uuid"]]): + continue + # orders must have the same order_uuid or same leverage, order_type, and processed_ms if self.dict_orders_aligned(order, o): matched_orders.append(o) @@ -353,7 +362,7 @@ def last_order_time_in_checkpoint(self, checkpoint: dict) -> int: latest_order_ms = max(latest_order_ms, order["processed_ms"]) return latest_order_ms - def parse_checkpoint_positions(self, validator_hotkey: str, checkpoint: dict, position_counts: dict, order_counts: dict, order_data: dict, miner_to_uuids: dict, miner_counts: dict, positions_matrix: dict, orders_matrix: dict): + def parse_checkpoint_positions(self, validator_hotkey: str, checkpoint: dict, position_counts: dict, order_counts: dict, order_data: dict, miner_to_uuids: dict, miner_counts: dict, positions_matrix: dict, orders_matrix: dict, position_validators: dict, order_validators: dict): """ parse checkpoint data @@ -365,6 +374,9 @@ def parse_checkpoint_positions(self, validator_hotkey: str, checkpoint: dict, po positions_matrix = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) # {miner hotkey: {trade pair: {validator hotkey: [all positions on validator]}}} orders_matrix = defaultdict(lambda: defaultdict(list)) # {position_uuid: {validator_hotkey: [all orders]}} + + position_validators = defaultdict(set) # {position_uuid: [validator hotkeys]} + order_validators = defaultdict(set) # {order_uuid: [validator hotkeys]} """ # get positions for each miner positions = checkpoint.get("positions", {}) @@ -382,9 +394,11 @@ def parse_checkpoint_positions(self, validator_hotkey: str, checkpoint: dict, po order_data[order_uuid].append(dict(order)) miner_to_uuids[miner_hotkey]["orders"].add(order_uuid) orders_matrix[position_uuid][validator_hotkey].append(order) + order_validators[order_uuid].add(validator_hotkey) orders_matrix[position_uuid][validator_hotkey].sort(key=lambda o: o["processed_ms"]) positions_matrix[miner_hotkey][position["trade_pair"][0]][validator_hotkey].append(position) + position_validators[position_uuid].add(validator_hotkey) def prune_position_orders(self, order_counts: dict, orders_matrix: dict): """ @@ -455,45 +469,7 @@ def find_legacy_miners(self, num_checkpoints: int, order_counts: dict, miner_to_ bt.logging.info(f"legacy_miner_candidates: {legacy_miner_candidates}") return legacy_miners - def heuristic_resolve_positions(self, positions_matrix: dict, num_checkpoints: int, seen_positions: set) -> List[dict]: - """ - takes a matrix of unmatched positions, and returns a list of positions to add back in - positions_matrix: {miner hotkey: {trade pair: {validator hotkey: [all positions on validator]}}} - """ - resolved_position_uuids = set() - - matched_positions = [] - # want to resolve all the unmatched positions for the validators against each other - for miner_hotkey, trade_pairs in positions_matrix.items(): - for trade_pair, validator in trade_pairs.items(): - for validator_hotkey, position_list in validator.items(): - for position in position_list: - if position["position_uuid"] in seen_positions: - continue - matches = self.find_matching_positions(position, trade_pairs[trade_pair], resolved_position_uuids, validator_hotkey) - - if (len(matches) > self.consensus_threshold(num_checkpoints) and - set([match["position_uuid"] for match in matches]).isdisjoint(seen_positions)): - # median number of orders for matched positions - median_order_count = len(matches[len(matches) // 2]["orders"]) - # greatest common number of orders by 2 or more positions - max_common_order_count = 0 - for i in range(len(matches)-1): - if len(matches[i]["orders"]) == len(matches[i+1]["orders"]): - max_common_order_count = len(matches[i]["orders"]) - break - goal_order_count = max(median_order_count, max_common_order_count) - - matches_with_goal_order_count = [p for p in matches if len(p["orders"]) == goal_order_count] - bt.logging.info(f"Miner hotkey {miner_hotkey} has matches {[p['position_uuid'] for p in matches]}. goal_order_count: {goal_order_count}. matches_with_goal_order_count: {[p['position_uuid'] for p in matches_with_goal_order_count]}.") - matched_positions.append(matches_with_goal_order_count[0]) - else: - bt.logging.info(f"Position {position['position_uuid']} only matched [{len(matches)}/{num_checkpoints}] times on miner {position['miner_hotkey']} with matches {[p['position_uuid'] for p in matches]}. Skipping") - - seen_positions.update([p["position_uuid"] for p in matches]) - return matched_positions - - def find_matching_positions(self, position: dict, trade_pair_validator_positions: dict, resolved_positions: set, corresponding_validator_hotkey: str) -> List[dict]: + def find_matching_positions(self, position: dict, trade_pair_validator_positions: dict, resolved_positions: set, corresponding_validator_hotkey: str, position_validators: dict) -> List[dict]: """ compares a position from corresponding_validator_hotkey to all other positions with matching trade pair from all the other validators. positions are matched with a heuristic, and returned in a list sorted by number of orders then position_uuid @@ -518,6 +494,10 @@ def find_matching_positions(self, position: dict, trade_pair_validator_positions if p["position_uuid"] in resolved_positions: continue + # if 2 positions position and p appear under the same checkpoint, they must be distinct and cannot match. + if p["position_uuid"] != position["position_uuid"] and not position_validators[p["position_uuid"]].isdisjoint(position_validators[position["position_uuid"]]): + continue + # positions have same position_type, # of orders, and open/close_ms times # or positions contain orders that are matched or have the same order_uuid if (self.dict_positions_aligned(position, p, validate_num_orders=True) or diff --git a/vali_objects/utils/validator_sync_base.py b/vali_objects/utils/validator_sync_base.py index 1f994c904..afb501545 100644 --- a/vali_objects/utils/validator_sync_base.py +++ b/vali_objects/utils/validator_sync_base.py @@ -274,6 +274,8 @@ def positions_aligned(self, p1, p2, timebound_ms=None, validate_num_orders=False return False def dict_positions_aligned(self, p1, p2, timebound_ms=None, validate_num_orders=False): + if p1["position_uuid"] == p2["position_uuid"]: + return True p1_initial_position_type = p1["orders"][0]["order_type"] p2_initial_position_type = p2["orders"][0]["order_type"] if validate_num_orders and len(p1["orders"]) != len(p2["orders"]): From 3f0d96ee08dec0df09d0ce1b6f2f82f040b47bba Mon Sep 17 00:00:00 2001 From: sli-tao Date: Tue, 1 Oct 2024 17:43:04 -0700 Subject: [PATCH 9/9] Revert "attempt heuristic match for every position" This reverts commit 45d5f0bc5c20593d8afb71923f35ed603e6f8518. --- tests/vali_tests/test_p2p_syncer.py | 46 +++++- vali_objects/utils/p2p_syncer.py | 184 ++++++++++++---------- vali_objects/utils/validator_sync_base.py | 2 - 3 files changed, 147 insertions(+), 85 deletions(-) diff --git a/tests/vali_tests/test_p2p_syncer.py b/tests/vali_tests/test_p2p_syncer.py index 0b2486783..1b8ffd470 100644 --- a/tests/vali_tests/test_p2p_syncer.py +++ b/tests/vali_tests/test_p2p_syncer.py @@ -361,7 +361,6 @@ def test_checkpoint_syncing_multiple_miners(self): orders = [order0] position1 = deepcopy(self.default_position) position1.position_uuid = "test_position2" - position1.miner_hotkey = "diff_miner" position1.orders = orders position1.rebuild_position_with_updated_orders() @@ -432,6 +431,51 @@ def test_checkpoint_syncing_miner_not_in_majority(self): assert len(self.p2p_syncer.golden["positions"][self.DEFAULT_MINER_HOTKEY]["positions"]) == 1 assert len(self.p2p_syncer.golden["positions"][self.DEFAULT_MINER_HOTKEY]["positions"][0]["orders"]) == 1 + def test_heuristic_resolve_positions(self): + order1 = deepcopy(self.default_order) + order1.order_uuid = "test_order1" + order1.processed_ms = 100 + orders = [order1] + position1 = deepcopy(self.default_position) + position1.position_uuid = "test_position1" + position1.orders = orders + position1.rebuild_position_with_updated_orders() + + order2 = deepcopy(self.default_order) + order2.order_uuid = "test_order2" + order2.processed_ms = 110 + orders = [order2] + position2 = deepcopy(self.default_position) + position2.position_uuid = "test_position2" + position2.orders = orders + position2.rebuild_position_with_updated_orders() + + order3 = deepcopy(self.default_order) + order3.order_uuid = "test_order3" + order3.processed_ms = 90 + orders = [order3] + position3 = deepcopy(self.default_position) + position3.position_uuid = "test_position3" + position3.orders = orders + position3.rebuild_position_with_updated_orders() + + order4 = deepcopy(self.default_order) + order4.order_uuid = "test_order4" + order4.processed_ms = TimeUtil.now_in_millis() - 1000 * 60 * 10 + orders = [order4] + position4 = deepcopy(self.default_position) + position4.position_uuid = "test_position4" + position4.orders = orders + position4.rebuild_position_with_updated_orders() + + matrix = {'miner_hotkey_1': {self.DEFAULT_TRADE_PAIR: {'validator_hotkey_1': [json.loads(position1.to_json_string())], 'validator_hotkey_2': [json.loads(position2.to_json_string())]}}, + 'miner_hotkey_2': {self.DEFAULT_TRADE_PAIR: {'validator_hotkey_3': [json.loads(position3.to_json_string())], 'validator_hotkey_4': [json.loads(position4.to_json_string())]}}} + + matched_positions = self.p2p_syncer.heuristic_resolve_positions(matrix, 2, set()) + + assert len(matched_positions) == 1 + assert matched_positions[0]["position_uuid"] == "test_position1" + def test_checkpoint_last_order_time(self): order1 = deepcopy(self.default_order) order1.order_uuid = "test_order1" diff --git a/vali_objects/utils/p2p_syncer.py b/vali_objects/utils/p2p_syncer.py index 84c136d1d..80478c814 100644 --- a/vali_objects/utils/p2p_syncer.py +++ b/vali_objects/utils/p2p_syncer.py @@ -204,106 +204,101 @@ def p2p_sync_positions(self, valid_checkpoints: dict): positions_matrix = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) # {miner hotkey: {trade pair: {validator hotkey: [all positions on validator]}}} orders_matrix = defaultdict(lambda: defaultdict(list)) # {position_uuid: {validator hotkey: [all orders on validator]}} - position_validators = defaultdict(set) # {position_uuid: {validator hotkeys}} - order_validators = defaultdict(set) # {order_uuid: {validator hotkeys}} - # parse each checkpoint to count occurrences of each position and order for hotkey, checkpoint in valid_checkpoints.items(): - self.parse_checkpoint_positions(hotkey, checkpoint, position_counts, order_counts, order_data, miner_to_uuids, miner_counts, positions_matrix, orders_matrix, position_validators, order_validators) + self.parse_checkpoint_positions(hotkey, checkpoint, position_counts, order_counts, order_data, miner_to_uuids, miner_counts, positions_matrix, orders_matrix) self.prune_position_orders(order_counts, orders_matrix) # miners who are still running legacy code. do not want to include them in checkpoint self.find_legacy_miners(len(valid_checkpoints), order_counts, miner_to_uuids, position_counts, order_data) # get the set of position_uuids that appear in the majority of checkpoints - threshold = self.consensus_threshold(len(valid_checkpoints)) + positions_threshold = self.consensus_threshold(len(valid_checkpoints)) + majority_positions = {position_uuid for position_uuid, count in position_counts.items() if count > positions_threshold} seen_positions = set() seen_orders = set() for validator_hotkey, checkpoint in valid_checkpoints.items(): positions = checkpoint.get("positions", {}) for miner_hotkey, miner_positions in positions.items(): - if miner_counts[miner_hotkey] <= threshold: + if miner_counts[miner_hotkey] <= positions_threshold: continue - matched_positions = self.construct_common_positions(miner_positions, seen_positions, seen_orders, order_data, orders_matrix, positions_matrix, validator_hotkey, threshold, position_validators, order_validators) - golden_positions[miner_hotkey]["positions"].extend(matched_positions) - # convert to dict + # combinations where the position_uuid appears in the majority + uuid_matched_positions = self.construct_positions_uuid_in_majority(miner_positions, majority_positions, seen_positions, seen_orders, position_counts, order_counts, order_data, orders_matrix, validator_hotkey) + golden_positions[miner_hotkey]["positions"].extend(uuid_matched_positions) + + # combinations where the position_uuid does not appear in the majority, instead we use a heuristic match to combine positions + for position in self.heuristic_resolve_positions(positions_matrix, len(valid_checkpoints), seen_positions): + bt.logging.info(f"Position {position['position_uuid']} on miner {position['miner_hotkey']} matched, adding back in") + miner_hotkey = position["miner_hotkey"] + golden_positions[miner_hotkey]["positions"].append(position) + + # convert defaultdict to dict return {miner: dict(golden_positions[miner]) for miner in golden_positions} - def construct_common_positions(self, miner_positions: dict, seen_positions: Set[str], seen_orders: Set[str], order_data: dict, orders_matrix: dict, positions_matrix: dict, validator_hotkey: str, threshold: int, position_validators: dict, order_validators: dict) -> List[dict]: + + def construct_positions_uuid_in_majority(self, miner_positions: dict, majority_positions: Set[str], seen_positions: Set[str], seen_orders: Set[str], position_counts: dict, order_counts: dict, order_data: dict, orders_matrix: dict, validator_hotkey: str) -> List[dict]: """ - return the positions to add to golden, by attempting to match each position up with others. + return the positions to add to golden, when the position_uuid appears in the majority of checkpoints. construct each position from its orders. if the order appears in the majority then it is taken, otherwise the order is attempted to be matched to other orders using a heuristic. + position_counts = defaultdict(int) # {position_uuid: count} + order_counts = defaultdict(lambda: defaultdict(int)) # {position_uuid: {order_uuid: count}} order_data = defaultdict(list) # {order_uuid: [{order}]} - positions_matrix = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) # {miner hotkey: {trade pair: {validator hotkey: [all positions on validator]}}} orders_matrix = defaultdict(lambda: defaultdict(list)) # {position_uuid: {validator hotkey: [all orders on validator]}} """ uuid_matched_positions = [] resolved_orders = set() # separate from seen_orders, because we want to be able to match with seen orders - resolved_positions = set() for position in miner_positions["positions"]: position_uuid = position["position_uuid"] - miner_hotkey = position["miner_hotkey"] - trade_pair_id = position["trade_pair"][0] - - if position_uuid not in seen_positions: - # find all matches for a position - matched_positions = self.find_matching_positions(position, positions_matrix[miner_hotkey][trade_pair_id], resolved_positions, validator_hotkey, position_validators) - matched_position_uuids = [p["position_uuid"] for p in matched_positions] - matched_position_uuids_set = set(matched_position_uuids) - # see if some positions are heuristically matched - if len(matched_position_uuids_set) != 1: - bt.logging.info(f"Positions matched for miner {miner_hotkey}: {matched_position_uuids}") - - # ensure that we have not previously encountered/added this position or its matches - if len(matched_positions) > threshold and matched_position_uuids_set.isdisjoint(seen_positions): - new_position = Position(**position) - new_position.orders = [] - - # get all the orders that appear in these positions - orders_in_matched_positions = set() - # all the validators and orders for the positions - matched_positions_orders_matrix = defaultdict(list) - for p in matched_positions: - orders_in_matched_positions.update([o["order_uuid"] for o in p["orders"]]) - matched_positions_orders_matrix.update(orders_matrix[p["position_uuid"]]) - - # find all the common orders - for order_uuid in orders_in_matched_positions: - if order_uuid not in seen_orders: - matched_orders = self.find_matching_orders(order_data[order_uuid][0], matched_positions_orders_matrix, resolved_orders, order_validators) - matched_order_uuids = [o["order_uuid"] for o in matched_orders] - matched_order_uuids_set = set(matched_order_uuids) - # see if some orders are heuristically matched - if len(matched_order_uuids_set) != 1: - bt.logging.info(f"Orders matched for miner {miner_hotkey}: {matched_order_uuids}, across positions: {matched_position_uuids}") - - # ensure that we have not previously encountered/added this order or its matches - if len(matched_orders) > threshold and matched_order_uuids_set.isdisjoint(seen_orders): - # add an order to position - trade_pair = TradePair.from_trade_pair_id(trade_pair_id) - median_order = self.get_median_order(matched_orders, trade_pair) - new_position.orders.append(median_order) - # add to seen orders - seen_orders.update(matched_order_uuids_set) - # sort orders by processed time - new_position.orders.sort(key=lambda o: o.processed_ms) - try: - new_position.rebuild_position_with_updated_orders() - position_dict = json.loads(new_position.to_json_string()) - uuid_matched_positions.append(position_dict) - except ValueError as v: - bt.logging.info(f"Miner [{new_position.miner_hotkey}] Position [{new_position.position_uuid}] Orders {[o.order_uuid for o in new_position.orders]} ValueError {v}") - # add to seen positions - seen_positions.update(matched_position_uuids_set) + # position exists on majority of validators + if position_uuid in majority_positions and position_uuid not in seen_positions: + seen_positions.add(position_uuid) + new_position = Position(**position) + new_position.orders = [] + + # get the set of order_uuids that appear in the majority of positions for a position_uuid + orders_threshold = self.consensus_threshold(position_counts[position_uuid]) + majority_orders = {order_uuid for order_uuid, count in order_counts[position_uuid].items() if count > orders_threshold} + + for order_uuid in order_counts[position_uuid].keys(): + if order_uuid not in seen_orders: + # combinations where the order_uuid appears in the majority + if order_uuid in majority_orders: + orders = order_data[order_uuid] + # combinations where the order_uuid does not appear in the majority, instead we use a heuristic to combine orders + else: + orders = self.find_matching_orders(order_data[order_uuid][0], orders_matrix[position_uuid], resolved_orders) + # order has matched with another order that has already been inserted + if not set([o["order_uuid"] for o in orders]).isdisjoint(seen_orders): + seen_orders.update([o["order_uuid"] for o in orders]) + continue + + if len(orders) > self.consensus_threshold(position_counts[position_uuid]): + bt.logging.info(f"Order {order_uuid} with Position {position_uuid} on miner {position['miner_hotkey']} matched with {[o['order_uuid'] for o in orders]}, adding back in") + else: + bt.logging.info(f"Order {order_uuid} with Position {position_uuid} only matched [{len(orders)}/{position_counts[position_uuid]}] times on miner {position['miner_hotkey']} with with {[o['order_uuid'] for o in orders]}. Skipping") + continue + + trade_pair = TradePair.from_trade_pair_id(position["trade_pair"][0]) + median_order = self.get_median_order(orders, trade_pair) + new_position.orders.append(median_order) + seen_orders.update([o["order_uuid"] for o in orders]) + + new_position.orders.sort(key=lambda o: o.processed_ms) + try: + new_position.rebuild_position_with_updated_orders() + position_dict = json.loads(new_position.to_json_string()) + uuid_matched_positions.append(position_dict) + except ValueError as v: + bt.logging.info(f"Miner [{new_position.miner_hotkey}] Position [{new_position.position_uuid}] Orders {[o.order_uuid for o in new_position.orders]} ValueError {v}") return uuid_matched_positions - def find_matching_orders(self, order: dict, validator_to_orders: dict, resolved_orders: Set[str], order_validators: dict) -> List[dict] | None: + def find_matching_orders(self, order: dict, validator_to_orders: dict, resolved_orders: Set[str]) -> List[dict] | None: """ compare an order to all other orders associated with a position, and find all the matches using a heuristic. sort matches by order_uuid. @@ -319,10 +314,6 @@ def find_matching_orders(self, order: dict, validator_to_orders: dict, resolved_ if o["order_uuid"] in resolved_orders: continue - # if 2 orders order and o appear under the same checkpoint, they must be distinct and cannot match. - if o["order_uuid"] != order["order_uuid"] and not order_validators[order["order_uuid"]].isdisjoint(order_validators[o["order_uuid"]]): - continue - # orders must have the same order_uuid or same leverage, order_type, and processed_ms if self.dict_orders_aligned(order, o): matched_orders.append(o) @@ -362,7 +353,7 @@ def last_order_time_in_checkpoint(self, checkpoint: dict) -> int: latest_order_ms = max(latest_order_ms, order["processed_ms"]) return latest_order_ms - def parse_checkpoint_positions(self, validator_hotkey: str, checkpoint: dict, position_counts: dict, order_counts: dict, order_data: dict, miner_to_uuids: dict, miner_counts: dict, positions_matrix: dict, orders_matrix: dict, position_validators: dict, order_validators: dict): + def parse_checkpoint_positions(self, validator_hotkey: str, checkpoint: dict, position_counts: dict, order_counts: dict, order_data: dict, miner_to_uuids: dict, miner_counts: dict, positions_matrix: dict, orders_matrix: dict): """ parse checkpoint data @@ -374,9 +365,6 @@ def parse_checkpoint_positions(self, validator_hotkey: str, checkpoint: dict, po positions_matrix = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) # {miner hotkey: {trade pair: {validator hotkey: [all positions on validator]}}} orders_matrix = defaultdict(lambda: defaultdict(list)) # {position_uuid: {validator_hotkey: [all orders]}} - - position_validators = defaultdict(set) # {position_uuid: [validator hotkeys]} - order_validators = defaultdict(set) # {order_uuid: [validator hotkeys]} """ # get positions for each miner positions = checkpoint.get("positions", {}) @@ -394,11 +382,9 @@ def parse_checkpoint_positions(self, validator_hotkey: str, checkpoint: dict, po order_data[order_uuid].append(dict(order)) miner_to_uuids[miner_hotkey]["orders"].add(order_uuid) orders_matrix[position_uuid][validator_hotkey].append(order) - order_validators[order_uuid].add(validator_hotkey) orders_matrix[position_uuid][validator_hotkey].sort(key=lambda o: o["processed_ms"]) positions_matrix[miner_hotkey][position["trade_pair"][0]][validator_hotkey].append(position) - position_validators[position_uuid].add(validator_hotkey) def prune_position_orders(self, order_counts: dict, orders_matrix: dict): """ @@ -469,7 +455,45 @@ def find_legacy_miners(self, num_checkpoints: int, order_counts: dict, miner_to_ bt.logging.info(f"legacy_miner_candidates: {legacy_miner_candidates}") return legacy_miners - def find_matching_positions(self, position: dict, trade_pair_validator_positions: dict, resolved_positions: set, corresponding_validator_hotkey: str, position_validators: dict) -> List[dict]: + def heuristic_resolve_positions(self, positions_matrix: dict, num_checkpoints: int, seen_positions: set) -> List[dict]: + """ + takes a matrix of unmatched positions, and returns a list of positions to add back in + positions_matrix: {miner hotkey: {trade pair: {validator hotkey: [all positions on validator]}}} + """ + resolved_position_uuids = set() + + matched_positions = [] + # want to resolve all the unmatched positions for the validators against each other + for miner_hotkey, trade_pairs in positions_matrix.items(): + for trade_pair, validator in trade_pairs.items(): + for validator_hotkey, position_list in validator.items(): + for position in position_list: + if position["position_uuid"] in seen_positions: + continue + matches = self.find_matching_positions(position, trade_pairs[trade_pair], resolved_position_uuids, validator_hotkey) + + if (len(matches) > self.consensus_threshold(num_checkpoints) and + set([match["position_uuid"] for match in matches]).isdisjoint(seen_positions)): + # median number of orders for matched positions + median_order_count = len(matches[len(matches) // 2]["orders"]) + # greatest common number of orders by 2 or more positions + max_common_order_count = 0 + for i in range(len(matches)-1): + if len(matches[i]["orders"]) == len(matches[i+1]["orders"]): + max_common_order_count = len(matches[i]["orders"]) + break + goal_order_count = max(median_order_count, max_common_order_count) + + matches_with_goal_order_count = [p for p in matches if len(p["orders"]) == goal_order_count] + bt.logging.info(f"Miner hotkey {miner_hotkey} has matches {[p['position_uuid'] for p in matches]}. goal_order_count: {goal_order_count}. matches_with_goal_order_count: {[p['position_uuid'] for p in matches_with_goal_order_count]}.") + matched_positions.append(matches_with_goal_order_count[0]) + else: + bt.logging.info(f"Position {position['position_uuid']} only matched [{len(matches)}/{num_checkpoints}] times on miner {position['miner_hotkey']} with matches {[p['position_uuid'] for p in matches]}. Skipping") + + seen_positions.update([p["position_uuid"] for p in matches]) + return matched_positions + + def find_matching_positions(self, position: dict, trade_pair_validator_positions: dict, resolved_positions: set, corresponding_validator_hotkey: str) -> List[dict]: """ compares a position from corresponding_validator_hotkey to all other positions with matching trade pair from all the other validators. positions are matched with a heuristic, and returned in a list sorted by number of orders then position_uuid @@ -494,10 +518,6 @@ def find_matching_positions(self, position: dict, trade_pair_validator_positions if p["position_uuid"] in resolved_positions: continue - # if 2 positions position and p appear under the same checkpoint, they must be distinct and cannot match. - if p["position_uuid"] != position["position_uuid"] and not position_validators[p["position_uuid"]].isdisjoint(position_validators[position["position_uuid"]]): - continue - # positions have same position_type, # of orders, and open/close_ms times # or positions contain orders that are matched or have the same order_uuid if (self.dict_positions_aligned(position, p, validate_num_orders=True) or diff --git a/vali_objects/utils/validator_sync_base.py b/vali_objects/utils/validator_sync_base.py index afb501545..1f994c904 100644 --- a/vali_objects/utils/validator_sync_base.py +++ b/vali_objects/utils/validator_sync_base.py @@ -274,8 +274,6 @@ def positions_aligned(self, p1, p2, timebound_ms=None, validate_num_orders=False return False def dict_positions_aligned(self, p1, p2, timebound_ms=None, validate_num_orders=False): - if p1["position_uuid"] == p2["position_uuid"]: - return True p1_initial_position_type = p1["orders"][0]["order_type"] p2_initial_position_type = p2["orders"][0]["order_type"] if validate_num_orders and len(p1["orders"]) != len(p2["orders"]):