Skip to content

Commit

Permalink
vali
Browse files Browse the repository at this point in the history
  • Loading branch information
latentvector committed May 15, 2024
1 parent 0052287 commit 161ceb1
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 55 deletions.
20 changes: 1 addition & 19 deletions commune/module/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -1257,27 +1257,9 @@ def check_used_ports(self, start_port = 8501, end_port = 8600, timeout=5):
for port in range(*port_range):
used_ports[port] = self.port_used(port)
return used_ports


@classmethod
def kill_port(cls, port:int, mode='bash')-> str:

if not c.port_used(port):
return {'success': True, 'msg': f'port {port} is not in use'}
if mode == 'python':
import signal
from psutil import process_iter
'''
Kills the port {port} on the localhost
'''
for proc in process_iter():
for conns in proc.connections(kind='inet'):
if conns.laddr.port == port:
proc.send_signal(signal.SIGKILL) # or SIGKILL
return port
elif mode == 'bash':
c.cmd(f'kill -9 $(lsof -ti:{port})', bash=True, verbose=True)
return {'success': True, 'msg': f'killed port {port}'}
return c.module('os').kill_port(port=port, mode=mode)

@classmethod
def pm2_restart_all(cls):
Expand Down
20 changes: 20 additions & 0 deletions commune/os.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,4 +581,24 @@ def sys_path(cls):
return sys.path


@classmethod
def kill_port(cls, port:int, mode:str='python'):

if not c.port_used(port):
return {'success': True, 'msg': f'port {port} is not in use'}
if mode == 'python':
import signal
from psutil import process_iter
'''
Kills the port {port} on the localhost
'''
for proc in process_iter():
for conns in proc.connections(kind='inet'):
if conns.laddr.port == port:
proc.send_signal(signal.SIGKILL) # or SIGKILL
return port
elif mode == 'bash':
c.cmd(f'kill -9 $(lsof -ti:{port})', bash=True, verbose=True)
return {'success': True, 'msg': f'killed port {port}'}


85 changes: 51 additions & 34 deletions commune/vali/vali.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class Vali(c.Module):
errors = 0
requests = 0
successes = 0
epochs = 0
voting_networks: ['subspace', 'bittensor']
score_fns = ['score_module', 'score'] # the score functions
whitelist = ['eval_module', 'score_module', 'eval', 'leaderboard']
Expand All @@ -22,6 +23,7 @@ def __init__(self,

def init_vali(self, config=None, module=None, score_fn=None, **kwargs):


if score_fn != None:
self.set_score_fn(score_fn)
if module != None:
Expand Down Expand Up @@ -97,14 +99,38 @@ def worker_name(self, id = 0):

def age(self):
return c.time() - self.start_time


def process_result(self, result):
w = result.get('w', 0)
address = result.get('address', 'unknown')
name = result.get('name', 'unknown')
c.print(f'<Result>:: {name}({address}) --> {w}', color='purple', verbose=self.config.verbose or self.config.debug)
if c.is_error(result):
self.epoch_info['errors'] += 1
else:
# record the success statistics
if result.get('w', 0) > 0:
self.epoch_info['successes'] += 1
self.epoch_info['last_success'] = c.time()
else:
self.epoch_info['errors'] += 1
return result


def worker(self,
epochs=1e9,
id=0):
for epoch in range(int(epochs)):
try:
t0 = c.time()
self.epoch()
t1 = c.time()
latency = t1 - t0
if latency < self.config.min_update_interval:
sleep_time = (self.config.min_update_interval - latency)
c.print(f'Sleeping for {sleep_time} seconds', color='yellow')
c.sleep(sleep_time)
except Exception as e:
c.print('Dawg, theres an error in the epoch')
c.print(c.detailed_error(e))
Expand All @@ -118,10 +144,23 @@ def run_epoch(cls, network='local', **kwargs):


def epoch(self, batch_size = None, network=None, **kwargs):

self.epochs += 1

if not hasattr(self, 'epoch_info'):
self.epoch_info = {
'requests': 0,
'errors': 0,
'successes': 0,
'last_sent': 0,
'last_success': 0,
'batch_size': 0,
'epochs': self.epochs,
}
futures = []
results = []
module_addresses = c.shuffle(list(self.namespace.values()))
c.print(f'Epoch {self.epochs} with {len(module_addresses)} modules', color='yellow')
batch_size = min(self.config.batch_size, len(module_addresses))
self.executor = c.module('executor.thread')(max_workers=batch_size)
batch_size = self.config.batch_size
Expand All @@ -137,24 +176,16 @@ def epoch(self, batch_size = None, network=None, **kwargs):
continue
futures.append(self.executor.submit(self.eval, args=[module_address],timeout=self.config.timeout))
self.epoch_info['last_sent'] = c.time()
self.epoch_info['requests'] = len(futures)
self.address2last_update[module_address] = self.epoch_info['last_sent']


if len(futures) >= batch_size:
try:
for future in c.as_completed(futures, timeout=self.config.timeout):
result = future.result()
c.print(result, verbose=self.config.debug or self.config.verbose)
futures.remove(future)
if c.is_error(result):
c.print('ERROR', result, verbose=self.config.verbose)
self.errors += 1
else:
# record the success statistics
if result.get('w', 0) > 0:
self.epoch_info['successes'] += 1
self.epoch_info['last_success'] = c.time()

result = self.process_result(result)
results += [result]
break
except Exception as e:
Expand All @@ -165,20 +196,12 @@ def epoch(self, batch_size = None, network=None, **kwargs):
for future in c.as_completed(futures, timeout=self.config.timeout):
futures.remove(future) # remove the future
result = future.result() # result
result = self.process_result(result)
results += [result]
except Exception as e:
c.print('ERROR',c.detailed_error(e))
return results

epoch_info = {
'requests': 0,
'errors': 0,
'successes': 0,
'last_sent': 0,
'last_success': 0,
'batch_size': 0,
}

def network_staleness(self):
# return the time since the last sync with the network
return c.time() - self.last_sync_time
Expand Down Expand Up @@ -250,20 +273,7 @@ def set_network(self,
@property
def verbose(self):
return self.config.verbose or self.config.debug


def process_response(self, response:dict):
if type(response) in [int, float, bool]:
# if the response is a number, we want to convert it to a dict
response = {'w': float(response)}
elif type(response) == dict:
response = response
else:
raise Exception(f'Response must be a number or a boolean, got {response}')

assert type(response['w']) in [int, float], f'Response weight must be a number, got {response["w"]}'
return response


def set_score_fn(self, score_fn):
assert callable(score_fn), f'Score function must be callable, got {score_fn}'
Expand Down Expand Up @@ -329,7 +339,7 @@ def eval(self, module:str = None,
else:
info = module.info(timeout=self.config.timeout)

c.print(f'🚀 :: Eval Module {info["name"]} ({info["address"]}) 🚀', color='yellow', verbose=verbose)
c.print(f'<Calling>:: {info["name"]}({info["address"]})', color='yellow', verbose=verbose)

assert 'address' in info and 'name' in info, f'Info must have a address key, got {info}'
info['staleness'] = c.time() - info.get('timestamp', 0)
Expand All @@ -338,7 +348,14 @@ def eval(self, module:str = None,
start_time = c.time()
try:
response = self.score_module(module)
response = self.process_response(response)
if type(response) in [int, float, bool]:
# if the response is a number, we want to convert it to a dict
response = {'w': float(response)}
elif type(response) == dict:
response = response
else:
raise Exception(f'Response must be a number or a boolean, got {response}')
assert type(response['w']) in [int, float], f'Response weight must be a number, got {response["w"]}'
except Exception as e:
error = c.detailed_error(e)
response = {'w': 0, 'error': error}
Expand Down
2 changes: 1 addition & 1 deletion commune/vali/vali.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ print_interval: 5 # the sleep interval between sending samples
vote_interval: 100 # the interval to vote (in blocks (8 seconds per block))

# logging
verbose: False # the verbosity of the logging
verbose: True # the verbosity of the logging
3 changes: 2 additions & 1 deletion subnet/vali.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ def score_module(self, module):
else:
return 0

def test(self, n=3, sleep_time=3):
def test(self, n=3, sleep_time=3, min_update_interval=0):
self.config.min_update_interval
test_miners = ['subnet.miner.add::test' for i in range(n)]
for miner in test_miners:
c.serve(miner)
Expand Down

0 comments on commit 161ceb1

Please sign in to comment.