Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion meta/meta.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"subnet_version": "4.2.11"
"subnet_version": "4.3.0"
}
2 changes: 1 addition & 1 deletion neurons/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def __init__(self):

self.config = self.get_config()
# Use the getattr function to safely get the autosync attribute with a default of False if not found.
self.auto_sync = getattr(self.config, 'autosync', False) and 'mothership' not in ValiUtils.get_secrets()
self.auto_sync = False
self.is_mainnet = self.config.netuid == 8
# Ensure the directory for logging exists, else create one.
if not os.path.exists(self.config.full_path):
Expand Down
31 changes: 21 additions & 10 deletions tests/vali_tests/test_p2p_syncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ def test_checkpoint_syncing_order_not_in_majority(self):
assert len(self.p2p_syncer.golden["positions"][self.DEFAULT_MINER_HOTKEY]["positions"][0]["orders"]) == 2

def test_checkpoint_syncing_order_not_in_majority_with_multiple_positions(self):
"""
position 1 has order 1 and 2 in the majority, order 0 heuristic matches with 1 so it is not an additional order
position 2, order 3 and 4 heuristic match together
"""
order1 = deepcopy(self.default_order)
order1.order_uuid = "test_order1"
order2 = deepcopy(self.default_order)
Expand Down Expand Up @@ -253,7 +257,7 @@ def test_checkpoint_syncing_order_not_in_majority_with_multiple_positions(self):

assert len(self.p2p_syncer.golden["positions"][self.DEFAULT_MINER_HOTKEY]["positions"]) == 2
assert len(self.p2p_syncer.golden["positions"][self.DEFAULT_MINER_HOTKEY]["positions"][0]["orders"]) == 2
assert len(self.p2p_syncer.golden["positions"][self.DEFAULT_MINER_HOTKEY]["positions"][1]["orders"]) == 2
assert len(self.p2p_syncer.golden["positions"][self.DEFAULT_MINER_HOTKEY]["positions"][1]["orders"]) == 1

def test_checkpoint_syncing_position_not_in_majority(self):
order1 = deepcopy(self.default_order)
Expand Down Expand Up @@ -341,6 +345,9 @@ def test_checkpoint_syncing_multiple_positions(self):
assert len(self.p2p_syncer.golden["positions"][self.DEFAULT_MINER_HOTKEY]["positions"][0]["orders"]) == 1

def test_checkpoint_syncing_multiple_miners(self):
"""
miners included as long as they appear in majority of checkpoints
"""
order1 = deepcopy(self.default_order)
order1.order_uuid = "test_order1"
orders = [order1]
Expand All @@ -349,19 +356,20 @@ def test_checkpoint_syncing_multiple_miners(self):
position.orders = orders
position.rebuild_position_with_updated_orders()

checkpoint1 = {"positions": {self.DEFAULT_MINER_HOTKEY: {"positions": [json.loads(position.to_json_string())]}}}
checkpoint2 = {"positions": {self.DEFAULT_MINER_HOTKEY: {"positions": [json.loads(position.to_json_string())]}}}

order0 = deepcopy(self.default_order)
order0.order_uuid = "test_order0"
orders = [order0]
position = deepcopy(self.default_position)
position.position_uuid = "test_position2"
position.orders = orders
position.rebuild_position_with_updated_orders()
position1 = deepcopy(self.default_position)
position1.position_uuid = "test_position2"
position1.orders = orders
position1.rebuild_position_with_updated_orders()

checkpoint3 = {"positions": {"diff_miner": {"positions": [json.loads(position.to_json_string())]}}}
checkpoint4 = {"positions": {"diff_miner": {"positions": [json.loads(position.to_json_string())]}}}
checkpoint1 = {"positions": {self.DEFAULT_MINER_HOTKEY: {"positions": [json.loads(position.to_json_string())]}}}
checkpoint2 = {"positions": {self.DEFAULT_MINER_HOTKEY: {"positions": [json.loads(position.to_json_string())]},
"diff_miner": {"positions": [json.loads(position1.to_json_string())]}}}

checkpoint3 = {"positions": {"diff_miner": {"positions": [json.loads(position1.to_json_string())]}}}
checkpoint4 = {"positions": {"diff_miner": {"positions": [json.loads(position1.to_json_string())]}, self.DEFAULT_MINER_HOTKEY: {"positions": [json.loads(position.to_json_string())]}}}

checkpoints = {"test_validator1": [0, checkpoint1], "test_validator2": [0, checkpoint2],
"test_validator3": [0, checkpoint3], "test_validator4": [0, checkpoint4]}
Expand All @@ -380,6 +388,9 @@ def test_checkpoint_syncing_one_of_each_miner(self):
pass

def test_checkpoint_syncing_miner_not_in_majority(self):
"""
if the miner does not appear in the majority of checkpoints it will not be included
"""
order1 = deepcopy(self.default_order)
order1.order_uuid = "test_order1"
orders = [order1]
Expand Down
45 changes: 22 additions & 23 deletions vali_objects/utils/p2p_syncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(self, wallet=None, metagraph=None, is_testnet=None, shutdown_dict=N
self.created_golden = False
self.last_signal_sync_time_ms = 0
self.running_unit_tests = running_unit_tests
self.min_checkpoints = 1 if running_unit_tests else ValiConfig.MIN_CHECKPOINTS_RECEIVED

def send_checkpoint_requests(self):
"""
Expand Down Expand Up @@ -84,7 +85,7 @@ def send_checkpoint_requests(self):

bt.logging.info(f"{n_successful_checkpoints} responses succeeded. {n_failures} responses failed")

if (n_successful_checkpoints > 0 and self.is_testnet) or n_successful_checkpoints >= ValiConfig.MIN_CHECKPOINTS_RECEIVED:
if (n_successful_checkpoints > 0 and self.is_testnet) or n_successful_checkpoints >= self.min_checkpoints:
# sort all our successful responses by validator_trust
sorted_v_trust = sorted(hotkey_to_received_checkpoint.items(), key=lambda item: item[1][0], reverse=True)
hotkey_to_received_checkpoint = {checkpoint[0]: checkpoint[1] for checkpoint in sorted_v_trust}
Expand Down Expand Up @@ -122,8 +123,8 @@ def create_golden(self, trusted_checkpoints: dict) -> bool:
else:
bt.logging.info(f"Checkpoint from validator {hotkey} is stale with newest order timestamp {latest_order_ms}, {round((TimeUtil.now_in_millis() - latest_order_ms)/(1000 * 60 * 60))} hrs ago, Skipping.")

if len(valid_checkpoints) == 0:
bt.logging.info(f"All {len(trusted_checkpoints)} checkpoints are stale, unable to build golden.")
if len(valid_checkpoints) < self.min_checkpoints:
bt.logging.info(f"Only {len(valid_checkpoints)} checkpoints are not stale, unable to build golden. Min required: {self.min_checkpoints}")
return False
else:
bt.logging.info(f"Building golden from [{len(valid_checkpoints)}/{len(trusted_checkpoints)}] up-to-date checkpoints.")
Expand Down Expand Up @@ -161,8 +162,8 @@ def p2p_sync_challengeperiod(self, valid_checkpoints: dict):
self.parse_checkpoint_challengeperiod(checkpoint, challengeperiod_testing_data, challengeperiod_success_data)

threshold = self.consensus_threshold(len(valid_checkpoints))
majority_testing = {hotkey for hotkey, times in challengeperiod_testing_data.items() if len(times) >= threshold}
majority_success = {hotkey for hotkey, times in challengeperiod_success_data.items() if len(times) >= threshold}
majority_testing = {hotkey for hotkey, times in challengeperiod_testing_data.items() if len(times) > threshold}
majority_success = {hotkey for hotkey, times in challengeperiod_success_data.items() if len(times) > threshold}

for hotkey in majority_testing:
challengeperiod_testing[hotkey] = statistics.median_low(challengeperiod_testing_data[hotkey])
Expand Down Expand Up @@ -213,14 +214,14 @@ def p2p_sync_positions(self, valid_checkpoints: dict):

# get the set of position_uuids that appear in the majority of checkpoints
positions_threshold = self.consensus_threshold(len(valid_checkpoints))
majority_positions = {position_uuid for position_uuid, count in position_counts.items() if count >= positions_threshold}
majority_positions = {position_uuid for position_uuid, count in position_counts.items() if count > positions_threshold}
seen_positions = set()
seen_orders = set()

for validator_hotkey, checkpoint in valid_checkpoints.items():
positions = checkpoint.get("positions", {})
for miner_hotkey, miner_positions in positions.items():
if miner_counts[miner_hotkey] < positions_threshold:
if miner_counts[miner_hotkey] <= positions_threshold:
continue

# combinations where the position_uuid appears in the majority
Expand Down Expand Up @@ -262,7 +263,7 @@ def construct_positions_uuid_in_majority(self, miner_positions: dict, majority_p

# get the set of order_uuids that appear in the majority of positions for a position_uuid
orders_threshold = self.consensus_threshold(position_counts[position_uuid])
majority_orders = {order_uuid for order_uuid, count in order_counts[position_uuid].items() if count >= orders_threshold}
majority_orders = {order_uuid for order_uuid, count in order_counts[position_uuid].items() if count > orders_threshold}

for order_uuid in order_counts[position_uuid].keys():
if order_uuid not in seen_orders:
Expand All @@ -277,7 +278,7 @@ def construct_positions_uuid_in_majority(self, miner_positions: dict, majority_p
seen_orders.update([o["order_uuid"] for o in orders])
continue

if len(orders) > self.consensus_threshold(position_counts[position_uuid], heuristic_match=True):
if len(orders) > self.consensus_threshold(position_counts[position_uuid]):
bt.logging.info(f"Order {order_uuid} with Position {position_uuid} on miner {position['miner_hotkey']} matched with {[o['order_uuid'] for o in orders]}, adding back in")
else:
bt.logging.info(f"Order {order_uuid} with Position {position_uuid} only matched [{len(orders)}/{position_counts[position_uuid]}] times on miner {position['miner_hotkey']} with with {[o['order_uuid'] for o in orders]}. Skipping")
Expand Down Expand Up @@ -471,7 +472,7 @@ def heuristic_resolve_positions(self, positions_matrix: dict, num_checkpoints: i
continue
matches = self.find_matching_positions(position, trade_pairs[trade_pair], resolved_position_uuids, validator_hotkey)

if (len(matches) > self.consensus_threshold(num_checkpoints, heuristic_match=True) and
if (len(matches) > self.consensus_threshold(num_checkpoints) and
set([match["position_uuid"] for match in matches]).isdisjoint(seen_positions)):
# median number of orders for matched positions
median_order_count = len(matches[len(matches) // 2]["orders"])
Expand All @@ -484,7 +485,7 @@ def heuristic_resolve_positions(self, positions_matrix: dict, num_checkpoints: i
goal_order_count = max(median_order_count, max_common_order_count)

matches_with_goal_order_count = [p for p in matches if len(p["orders"]) == goal_order_count]
bt.logging.info(f"Miner hotkey {miner_hotkey} has matches {[p['position_uuid'] for p in matches]}. goal_order_count: {goal_order_count}. matches_with_goal_order_count: {matches_with_goal_order_count}.")
bt.logging.info(f"Miner hotkey {miner_hotkey} has matches {[p['position_uuid'] for p in matches]}. goal_order_count: {goal_order_count}. matches_with_goal_order_count: {[p['position_uuid'] for p in matches_with_goal_order_count]}.")
matched_positions.append(matches_with_goal_order_count[0])
else:
bt.logging.info(f"Position {position['position_uuid']} only matched [{len(matches)}/{num_checkpoints}] times on miner {position['miner_hotkey']} with matches {[p['position_uuid'] for p in matches]}. Skipping")
Expand Down Expand Up @@ -531,14 +532,11 @@ def find_matching_positions(self, position: dict, trade_pair_validator_positions
matched_positions.sort(key=lambda x: (-len(x["orders"]), x["position_uuid"]))
return matched_positions

def consensus_threshold(self, total_items: int, heuristic_match: bool=False) -> int:
def consensus_threshold(self, total_items: int) -> int:
"""
threshold for including a position or order in the golden
"""
if heuristic_match:
return math.floor(total_items / 2)
else:
return math.ceil(total_items / 2)
return math.floor(total_items / 2)

def get_median_order(self, orders: List[dict], trade_pair: TradePair) -> Order:
"""
Expand Down Expand Up @@ -588,19 +586,20 @@ def sync_positions_with_cooldown(self):
if not (47 < datetime_now.minute < 57):
return
else:
# Check if we are between 7:09 AM and 7:19 AM UTC
# Temp change time to 21:00 UTC so we can see the effects in shadow mode ASAP
# Check if we are between 7:19 AM and 7:29 AM UTC
if not (datetime_now.hour == 1 and (18 < datetime_now.minute < 30)):
return

try:
bt.logging.info("Calling send_checkpoint_requests")
bt.logging.info("Sending checkpoint requests")
self.golden = None
self.send_checkpoint_requests()
if self.created_golden:
bt.logging.info("Calling apply_golden")
# TODO guard sync_positions with the signal lock once we move on from shadow mode
self.sync_positions(True, candidate_data=self.golden)
bt.logging.info("Golden created. Syncing positions.")
with self.signal_sync_lock:
while self.n_orders_being_processed[0] > 0:
self.signal_sync_condition.wait()
self.sync_positions(False, candidate_data=self.golden)
except Exception as e:
bt.logging.error(f"Error sending checkpoint: {e}")
bt.logging.error(traceback.format_exc())
Expand All @@ -612,4 +611,4 @@ def sync_positions_with_cooldown(self):
position_syncer = P2PSyncer(is_testnet=True)
position_syncer.send_checkpoint_requests()
if position_syncer.created_golden:
position_syncer.sync_positions(True, candidate_data=position_syncer.golden)
position_syncer.sync_positions(False, candidate_data=position_syncer.golden)
19 changes: 11 additions & 8 deletions vali_objects/utils/validator_sync_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,23 +179,26 @@ def write_modifications(self, position_to_sync_status, stats):
# self.debug_print_pos(position)
# print('---status', sync_status)

allow_writes = (not self.is_mothership or
(self.is_mothership and (stats['deleted'] == 0 or stats['deleted'] == stats['inserted'])))
bt.logging.info(f'allow_writes: {allow_writes} stats: {stats}')
# Deletions happen first
for position, sync_status in position_to_sync_status.items():
if sync_status == PositionSyncResult.DELETED:
deleted -= 1
if not self.is_mothership:
if allow_writes:
self.position_manager.delete_position_from_disk(position)

# Updates happen next
# First close out contradicting positions that happen if a validator is left in a bad state
#for position, sync_status in position_to_sync_status.items():
# if sync_status == PositionSyncResult.UPDATED or sync_status == PositionSyncResult.NOTHING:
# if not self.is_mothership:
# if position.is_closed_position:
# self.position_manager.delete_open_position_if_exists(position)
# for position, sync_status in position_to_sync_status.items():
# if sync_status == PositionSyncResult.UPDATED or sync_status == PositionSyncResult.NOTHING:
# if allow_writes:
# if position.is_closed_position:
# self.position_manager.delete_open_position_if_exists(position)
for position, sync_status in position_to_sync_status.items():
if sync_status == PositionSyncResult.UPDATED:
if not self.is_mothership:
if allow_writes:
positions = self.split_position_on_flat(position)
for p in positions:
if p.is_open_position:
Expand All @@ -206,7 +209,7 @@ def write_modifications(self, position_to_sync_status, stats):
for position, sync_status in position_to_sync_status.items():
if sync_status == PositionSyncResult.INSERTED:
inserted -= 1
if not self.is_mothership:
if allow_writes:
positions = self.split_position_on_flat(position)
for p in positions:
if p.is_open_position:
Expand Down