From 1fe81c4aa6ee4e6cb240fab4212009cfba279cf4 Mon Sep 17 00:00:00 2001 From: latentvector Date: Thu, 8 Aug 2024 23:14:50 -0400 Subject: [PATCH] vali updates --- commune/cli.py | 2 +- commune/client/client.py | 21 +- commune/module/_schema.py | 13 +- commune/module/_task.py | 60 ++ commune/module/_tree.py | 21 +- commune/module/module.py | 70 ++- commune/module/shortcuts.yaml | 1 + commune/modules/fam/__init__.py | 4 + commune/{ => modules}/miner/miner.py | 0 commune/modules/pm2/pm2.py | 2 +- commune/modules/storage/storage.py | 2 + commune/modules/subnet/vali.py | 6 +- commune/server/access.py | 3 +- commune/server/namespace.py | 5 +- commune/{ => server}/serializer/serializer.md | 0 commune/{ => server}/serializer/serializer.py | 0 .../serializer/serializers/numpy.py | 0 .../serializer/serializers/torch.py | 0 commune/vali/test.py | 4 +- commune/vali/vali.py | 584 +++++++----------- commune/vali/vali.yaml | 40 -- 21 files changed, 353 insertions(+), 485 deletions(-) create mode 100644 commune/modules/fam/__init__.py rename commune/{ => modules}/miner/miner.py (100%) rename commune/{ => server}/serializer/serializer.md (100%) rename commune/{ => server}/serializer/serializer.py (100%) rename commune/{ => server}/serializer/serializers/numpy.py (100%) rename commune/{ => server}/serializer/serializers/torch.py (100%) delete mode 100644 commune/vali/vali.yaml diff --git a/commune/cli.py b/commune/cli.py index 909ca9695..5d1675d36 100644 --- a/commune/cli.py +++ b/commune/cli.py @@ -126,7 +126,7 @@ def get_output(self, argv): if len(init_kwargs) > 0 or fn_class == 'self': print('init_kwargs', init_kwargs) module = module(**init_kwargs) - + print(module) module_name = module.module_name() fn_path = f'{module_name}/{fn}' try: diff --git a/commune/client/client.py b/commune/client/client.py index ad8d9c5b9..7920529e1 100644 --- a/commune/client/client.py +++ b/commune/client/client.py @@ -41,7 +41,7 @@ def call(cls, *args, kwargs = None, params = None, - module : str = None, + module : str = 'module', network:str = 'local', key:str = None, timeout=40, @@ -49,6 +49,9 @@ def call(cls, if '/' in str(fn): module = '.'.join(fn.split('/')[:-1]) fn = fn.split('/')[-1] + else: + module = fn + fn = 'info' client = cls.connect(module, virtual=False, network=network) return client.forward(fn=fn, args=args, @@ -114,22 +117,21 @@ def get_params(self, args=None, kwargs=None, params=None, version=1): return input - def get_url(self, fn, module, mode='http', network=None): + def get_url(self, fn, mode='http', network=None): network = network or self.network - module = module or self.module - if '/' in str(module): + if '://' in str(fn): + mode ,fn = fn.split('://') + if '/' in str(fn): module, fn = module.split('/') + else: + module = self.module if '/' in str(fn): module, fn = fn.split('/') - if '/' in module.split('://')[-1]: - module = module.split('://')[-1] - namespace = self.resolve_namespace(network) if module in namespace: module = namespace[module] url = f"{module}/{fn}/" url = f'{mode}://' + url if not url.startswith(f'{mode}://') else url - return url @@ -151,7 +153,8 @@ async def async_forward(self, mode = 'http', **extra_kwargs): key = self.resolve_key(key) - url = self.get_url(fn=fn, module=module,mode=mode, network=network) + network = network or self.network + url = self.get_url(fn=fn,mode=mode, network=network) kwargs = {**(kwargs or {}), **extra_kwargs} input = self.get_params( args=args, kwargs=kwargs, diff --git a/commune/module/_schema.py b/commune/module/_schema.py index 0b8a89e06..fbf14193a 100644 --- a/commune/module/_schema.py +++ b/commune/module/_schema.py @@ -101,11 +101,6 @@ def determine_type(cls, x): except ValueError: return x - - @classmethod - def resolve_object(cls, obj) -> Any: - return obj or cls - @classmethod def fn2code(cls, search=None, module=None)-> Dict[str, str]: @@ -400,9 +395,9 @@ def lines_of_code(cls, code:str=None): def code(cls, module = None, search=None, *args, **kwargs): if '/' in str(module) or module in cls.fns(): return cls.fn_code(module) - module = cls.resolve_object(module) - text = cls.get_text( module.pypath(), *args, **kwargs) + print(module) + text = cls.get_text( module.filepath(), *args, **kwargs) if search != None: find_lines = cls.find_lines(text=text, search=search) return find_lines @@ -494,7 +489,6 @@ def set_line(cls, idx:int, text:str): front_lines = lines[:idx] back_lines = lines[idx:] new_lines = text.split('\n') - c.print(new_lines) lines = front_lines + new_lines + back_lines else: lines[idx-1] = text @@ -1101,9 +1095,8 @@ def add_line(cls, path:str, text:str, line=None) -> None: # Write the text to the file if line != None: line=int(line) - lines = c.get_text(path).split('\n') + lines = cls.get_text(path).split('\n') lines = lines[:line] + [text] + lines[line:] - c.print(lines) text = '\n'.join(lines) with open(path, 'w') as file: diff --git a/commune/module/_task.py b/commune/module/_task.py index 768e0c41e..61ba4c5ce 100644 --- a/commune/module/_task.py +++ b/commune/module/_task.py @@ -4,6 +4,11 @@ import threading from typing import * +from typing import Union +import threading + + + class Task: @classmethod @@ -228,3 +233,58 @@ async def _asubmit(): kwargs.update(kwargs.pop('kwargs',{})) return fn(*args, **kwargs) return _asubmit() + + + + thread_map = {} + + @classmethod + def thread(cls,fn: Union['callable', str], + args:list = None, + kwargs:dict = None, + daemon:bool = True, + name = None, + tag = None, + start:bool = True, + tag_seperator:str='::', + **extra_kwargs): + + if isinstance(fn, str): + fn = cls.get_fn(fn) + if args == None: + args = [] + if kwargs == None: + kwargs = {} + + assert callable(fn), f'target must be callable, got {fn}' + assert isinstance(args, list), f'args must be a list, got {args}' + assert isinstance(kwargs, dict), f'kwargs must be a dict, got {kwargs}' + + # unique thread name + if name == None: + name = fn.__name__ + cnt = 0 + while name in cls.thread_map: + cnt += 1 + if tag == None: + tag = '' + name = name + tag_seperator + tag + str(cnt) + + if name in cls.thread_map: + cls.thread_map[name].join() + + t = threading.Thread(target=fn, args=args, kwargs=kwargs, **extra_kwargs) + # set the time it starts + setattr(t, 'start_time', cls.time()) + t.daemon = daemon + if start: + t.start() + cls.thread_map[name] = t + return t + + @classmethod + def threads(cls, search:str = None): + threads = list(cls.thread_map.keys()) + if search != None: + threads = [t for t in threads if search in t] + return threads diff --git a/commune/module/_tree.py b/commune/module/_tree.py index daba43b7d..288259ffd 100644 --- a/commune/module/_tree.py +++ b/commune/module/_tree.py @@ -15,10 +15,8 @@ def resolve_extension(cls, filename:str, extension = '.py') -> str: @classmethod def simple2path(cls, simple:str, - tree = None, extension = '.py', - - verbose = 1, + avoid_dirnames = ['', '/src', '/commune', '/commune/module', '/commune/modules', '/modules', '/blocks', '/agents', 'commune/agents'], **kwargs) -> bool: """ converts the module path to a file path @@ -33,6 +31,8 @@ def simple2path(cls, """ # if cls.libname in simple and '/' not in simple and cls.can_import_module(simple): # return simple + shortcuts = cls.shortcuts() + simple = shortcuts.get(simple, simple) if simple.endswith(extension): @@ -43,10 +43,9 @@ def simple2path(cls, path_options = [] simple = simple.replace('/', '.') - avoid_dirnames = ['', '/src', '/modules', '/commune'] - local_prefix_paths = list([pwd+x for x in avoid_dirnames]) - root_prefix_paths = list([cls.root_path + x for x in avoid_dirnames]) - dir_paths = local_prefix_paths+root_prefix_paths # pwd prefixes + dir_paths = list([pwd+x for x in avoid_dirnames]) # local first + dir_paths += list([cls.libpath + x for x in avoid_dirnames]) # add libpath stuff + for dir_path in dir_paths: # '/' count how many times the path has been split module_dirpath = dir_path + '/' + simple.replace('.', '/') @@ -58,16 +57,15 @@ def simple2path(cls, module_filepath = dir_path + '/' + cls.resolve_extension(simple.replace('.', '/'), extension=extension) path_options += [module_filepath] - for p in path_options: if os.path.exists(p): p_text = cls.get_text(p) + # gas class in text is_class_text = 'class ' in p_text or ' def ' in p_text if is_class_text: path = p break path = p - break if path != None: break @@ -439,12 +437,13 @@ def simple2objectpath(cls, object_path = object_path[len(cls.libpath)+1:] object_path = object_path.replace('.py', '') - + object_path = object_path.replace('/', '.') if object_path.startswith('.'): object_path = object_path[1:] if '.__init__' in object_path: - object_path = object_path.replace('__init__', '') + object_path = object_path.replace('.__init__', '') + object_path = object_path + '.' + classes[-1] return object_path diff --git a/commune/module/module.py b/commune/module/module.py index d7636e5dd..35c173f42 100755 --- a/commune/module/module.py +++ b/commune/module/module.py @@ -42,6 +42,7 @@ class c(*CORE_MODULES): 'is_admin', 'namespace', 'whitelist', + 'forward', 'fns'] # whitelist of helper functions to load cost = 1 description = """This is a module""" @@ -126,7 +127,6 @@ def dirpath(cls) -> str: def module_name(cls, obj=None): if hasattr(cls, 'name') and isinstance(cls.name, str): return cls.name - obj = cls.resolve_object(obj) module_file = inspect.getfile(obj) return cls.path2simple(module_file) @@ -154,6 +154,7 @@ def sandbox(cls): sand = sandbox module_cache = {} + _obj = None @classmethod def get_module(cls, path:str = 'module', @@ -191,6 +192,8 @@ def get_module(cls, if cache and cache_key in c.module_cache: module = c.module_cache[cache_key] return module + + module = c.simple2object(path) # ensure module @@ -202,23 +205,34 @@ def get_module(cls, if init_kwargs != None: module = module(**init_kwargs) - module = c.resolve_module(module) - + is_module = c.is_module(module) + if not is_module: + module = cls.obj2module(module) if cache: - c.module_cache[cache_key] = module - - - + c.module_cache[cache_key] = module return module @classmethod - def resolve_module(self,module): - if not hasattr(module, 'module_name'): - setattr(module, 'module_name', lambda: module.__name__.lower()) - - return module - + def obj2module(cls,obj): + import commune as c + class WrapperModule(c.Module): + _obj = obj + def __name__(self): + return self._obj.__name__ + def __class__(self): + return self._obj.__class__ + m = WrapperModule + + for fn in dir(obj): + try: + setattr(m, fn, getattr(obj, fn)) + except: + pass + + return m() + + @classmethod @@ -336,12 +350,11 @@ def resolve_server_name(cls, resolve_name = resolve_server_name @classmethod - def resolve_object(cls, module:str = None, **kwargs): - if module == None: - module = cls.module_name() - if isinstance(module, str): - module = c.module(module) - return module + def resolve_object(cls, obj:str = None, **kwargs): + if cls._obj != None: + return cls._obj + else: + return obj or cls def self_destruct(self): c.kill(self.server_name) @@ -815,21 +828,6 @@ def new_module( cls, return {'success': True, 'msg': f'Created module {module}', 'path': path} add_module = new_module - - @classmethod - def resolve_object(cls, module=None): - """ - Resolves the moduls from the class - Case type(module): - if None -> cls, the class method of the object - if str -> c.module({module}) - if - """ - if module == None: - module = cls - if isinstance(module, str): - module = c.module(module) - return module thread_map = {} @@ -1047,6 +1045,10 @@ def find_word(self, word:str, path='./')-> str: progress.update(1) return found_files + + + + # def update(self): # c.ip(update=1) diff --git a/commune/module/shortcuts.yaml b/commune/module/shortcuts.yaml index 51e1b59fa..c22ce2d95 100644 --- a/commune/module/shortcuts.yaml +++ b/commune/module/shortcuts.yaml @@ -18,4 +18,5 @@ w: wombo router: model.openrouter ticket: key.ticket namespace: server.namespace +serializer: server.serializer diff --git a/commune/modules/fam/__init__.py b/commune/modules/fam/__init__.py new file mode 100644 index 000000000..fa58d8c38 --- /dev/null +++ b/commune/modules/fam/__init__.py @@ -0,0 +1,4 @@ +import commune as c + +class Fam(c.Module): + fam = 1 \ No newline at end of file diff --git a/commune/miner/miner.py b/commune/modules/miner/miner.py similarity index 100% rename from commune/miner/miner.py rename to commune/modules/miner/miner.py diff --git a/commune/modules/pm2/pm2.py b/commune/modules/pm2/pm2.py index 00a54028a..cbc6f4724 100644 --- a/commune/modules/pm2/pm2.py +++ b/commune/modules/pm2/pm2.py @@ -117,7 +117,7 @@ def kill_many(cls, search=None, verbose:bool = True, timeout=10): futures = [] for name in cls.servers(search=search): c.print(f'[bold cyan]Killing[/bold cyan] [bold yellow]{name}[/bold yellow]', color='green', verbose=verbose) - f = c.submit(cls.kill, kwargs=dict(name=name, verbose=verbose), return_future=True, timeout=timeout) + f = c.submit(cls.kill, dict(name=name, verbose=verbose), return_future=True, timeout=timeout) futures.append(f) return c.wait(futures) diff --git a/commune/modules/storage/storage.py b/commune/modules/storage/storage.py index c66ef0275..ca9a6f0a0 100644 --- a/commune/modules/storage/storage.py +++ b/commune/modules/storage/storage.py @@ -14,6 +14,7 @@ def set_store_dir(self, store_dir): def resolve_item_path(self, path: str) -> str: store_dir = self.store_dir + path = str(path) path = path if path.startswith(store_dir) else f'{store_dir}/{path}' path = self.resolve_path(path) return path @@ -30,6 +31,7 @@ def file2size(self, fmt:str='b') -> int: @c.endpoint() def put_item(self, k, v: Dict, encrypt:bool=False): + k = str(k) timestamp = c.timestamp() path = self.resolve_item_path(k) # encrypt it if you want diff --git a/commune/modules/subnet/vali.py b/commune/modules/subnet/vali.py index 33c569c09..54c0f68c6 100644 --- a/commune/modules/subnet/vali.py +++ b/commune/modules/subnet/vali.py @@ -4,7 +4,7 @@ class Subnet(c.m('vali')): def __init__(self, network='local', search=None, **kwargs): self.init_vali(locals()) - def score_module(self, module): + def score(self, module): a = c.random_int() b = c.random_int() output = module.forward(a, b) @@ -13,13 +13,13 @@ def score_module(self, module): else: return 0 - def test(self, n=3, sleep_time=4): + def test(self, n=3, sleep_time=4, **kwargs): test_miners = ['subnet.miner.add::test' for i in range(n)] for miner in test_miners: c.serve(miner) test_vali = 'subnet.vali.add::test' for miner in test_miners: - c.serve(test_vali, kwargs={'network': 'local'}) + c.serve(test_vali, kwargs={'network': 'local', **kwargs}) c.print('Sleeping for 3 seconds') c.sleep(sleep_time) diff --git a/commune/server/access.py b/commune/server/access.py index c293018e0..204206443 100644 --- a/commune/server/access.py +++ b/commune/server/access.py @@ -119,7 +119,6 @@ def sync_network(self, update=False, max_age=None, netuid=None, network=None): network = network or self.config.network staleness = c.time() - state.get('sync_time', 0) self.address2key = c.address2key() - c.namespace(max_age=self.config.max_staleness) response = { 'path': self.state_path, 'max_staleness': self.config.max_staleness, @@ -134,6 +133,8 @@ def sync_network(self, update=False, max_age=None, netuid=None, network=None): else: response['msg'] = 'Synced with the network' response['staleness'] = 0 + + c.namespace(max_age=self.config.max_staleness) self.subspace = c.module('subspace')(network=network) state['stake'] = self.subspace.stakes(fmt='j', netuid=netuid, update=update, max_age=self.config.max_staleness) state['stake_from'] = self.subspace.stake_from(fmt='j', netuid=netuid, update=update, max_age=self.config.max_staleness) diff --git a/commune/server/namespace.py b/commune/server/namespace.py index 2c9c91500..f1ee60862 100644 --- a/commune/server/namespace.py +++ b/commune/server/namespace.py @@ -24,7 +24,7 @@ def namespace(cls, search=None, if netuid != None: network = f'subspace.{netuid}' - namespace = cls.get(network, {}, max_age=max_age) + namespace = cls.get(network, {}, max_age=max_age, update=update) if 'subspace' in network: if '.' in network: network, netuid = network.split('.') @@ -160,7 +160,7 @@ def check_servers(self, *args, **kwargs): def build_namespace(cls, timeout:int = 10, network:str = 'local', - verbose=True)-> dict: + verbose=False)-> dict: ''' The module port is where modules can connect with each othe. When a module is served "module.serve())" @@ -169,7 +169,6 @@ def build_namespace(cls, namespace = {} addresses = ['0.0.0.0'+':'+str(p) for p in c.used_ports()] future2address = {} - print(addresses) for address in addresses: f = c.submit(c.call, [address+'/server_name'], timeout=timeout) future2address[f] = address diff --git a/commune/serializer/serializer.md b/commune/server/serializer/serializer.md similarity index 100% rename from commune/serializer/serializer.md rename to commune/server/serializer/serializer.md diff --git a/commune/serializer/serializer.py b/commune/server/serializer/serializer.py similarity index 100% rename from commune/serializer/serializer.py rename to commune/server/serializer/serializer.py diff --git a/commune/serializer/serializers/numpy.py b/commune/server/serializer/serializers/numpy.py similarity index 100% rename from commune/serializer/serializers/numpy.py rename to commune/server/serializer/serializers/numpy.py diff --git a/commune/serializer/serializers/torch.py b/commune/server/serializer/serializers/torch.py similarity index 100% rename from commune/serializer/serializers/torch.py rename to commune/server/serializer/serializers/torch.py diff --git a/commune/vali/test.py b/commune/vali/test.py index aeaeead56..c2980efbe 100644 --- a/commune/vali/test.py +++ b/commune/vali/test.py @@ -44,10 +44,10 @@ def test_net(self, while c.time() - t0 < sleep_time: leaderboard = c.call(test_vali+'/leaderboard') - if len(leaderboard) >= n and isinstance(leaderboard, pd.DataFrame): + if len(leaderboard) >= n: break else: - c.print(f'Waiting for leaderboard to be updated {len(leaderboard)}') + c.print(f'Waiting for leaderboard to be updated {len(leaderboard)} is n={n}') c.sleep(1) leaderboard = c.call(test_vali+'/leaderboard') diff --git a/commune/vali/vali.py b/commune/vali/vali.py index 777f167b1..d038f4cbc 100644 --- a/commune/vali/vali.py +++ b/commune/vali/vali.py @@ -6,47 +6,81 @@ class Vali(c.Module): - whitelist = ['eval_module', 'score_module', 'eval', 'leaderboard'] + whitelist = ['eval_module', 'score', 'eval', 'leaderboard'] + voting_networks = ['bittensor', 'commune'] + score_fns = ['score_module', 'score', 'reward'], # the score functions def __init__(self, - config:dict=None, + # NETWORK CONFIGURATION + network= 'local', # for local subspace:test or test # for testnet subspace:main or main # for mainnet + netuid = 0, # (NOT LOCAL) the subnetwork uid or the netuid. This is a unique identifier for the subnetwork + search= None, # (OPTIONAL) the search string for the network + max_network_staleness= 10, # the maximum staleness of the network + # LOGGING CONFIGURATION + verbose= True, # the verbose mode for the worker + # WORKER EPOCH CONFIGURATION + max_size= 128, + max_workers= 64 , + batch_size= 64, + score_fn = None, + # MODULE EVAL CONFIGURATION + path= None, # the storage path for the module eval, if not null then the module eval is stored in this directory + alpha= 1.0, # alpha for score + timeout= 10, # timeout per evaluation of the module + max_staleness= 0, # the maximum staleness of the worker + epoch_time= 3600, # the maximum age of the leaderboard befor it is refreshed + min_leaderboard_weight= 0, # the minimum weight of the leaderboard + run_step_interval = 3, # the interval for the run loop to run + run_loop= True, # This is the key that we need to change to false + vote_interval= 100, # the number of iterations to wait before voting + module = None, + timeout_info= 4, # (OPTIONAL) the timeout for the info worker + + update=False, **kwargs): - self.init_vali(config=config, **kwargs) - - def set_score_fn(self, score_fn: Union[Callable, str] = None, module=None): + + config = self.set_config(locals()) + config = c.dict2munch({**Vali.config(), **config}) + self.config = config + if update: + self.config.max_staleness = 0 + self.sync(network=config.network, + search=config.search, + netuid=config.netuid, + max_network_staleness = config.max_network_staleness) + # start the run loop + if self.config.run_loop: + c.thread(self.run_loop) + init_vali = __init__ + + def set_score_fn(self, score_fn: Union[Callable, str]): """ Set the score function for the validator """ module = module or self - score_fn_options = [] - if score_fn == None: - for fn in self.config.score_fns: - if hasattr(module, fn): - score_fn_options += [fn] - score_fn = score_fn_options[0] - assert len(score_fn_options) == 1, f'Bruhhhh, you have multiple score functions {score_fn_options}, choose one fam' - else: - if isinstance(score_fn, str): - score_fn = c.get_fn(score_fn) + if isinstance(score_fn, str): + score_fn = c.get_fn(score_fn) + assert callable(score_fn) self.score = getattr(self, score_fn) return {'success': True, 'msg': 'Set score function', 'score_fn': self.score.__name__} def init_state(self): - # COUNT METRICS - self.last_start_time = 0 # the last time a worker was started - self.requests = 0 # the number of requests - self.errors = 0 # the number of errors - self.successes = 0 # the number of successes - self.epochs = 0 # the number of epochs - self.staleness_count = 0 # the number of staleness - # timestamp metrics - self.last_sync_time = 0 # the last time the network was synced - self.last_error = 0 # the last time an error occured - self.last_sent = 0 # the last time a request was sent - self.last_success = 0 # the last time a success was made - self.start_time = c.time() - self.results = [] + self.executor = c.module('executor.thread')(max_workers=self.config.max_workers, maxsize=self.config.max_size) self.futures = [] + self.state = c.dict2munch(dict( + requests = 0, + last_start_time = 0, + errors = 0, + successes = 0, + epochs = 0, + last_sync_time = 0, + last_error = 0, + last_sent = 0, + last_success = 0, + start_time = c.time(), + results = [], + futures = [], + )) def set_module(self, module:str, @@ -54,75 +88,29 @@ def set_module(self, if isinstance(module, str): module = c.module(module, **kwargs)() - does_module_have_score_fn = any([hasattr(module, fn) for fn in self.config.score_fns]) + does_module_have_score_fn = any([hasattr(module, fn) for fn in self.score_fns]) assert does_module_have_score_fn, f'Module must have a score function, got {module}' if hasattr(module, 'storage_dir'): - storage_path = self.storage_dir() + path = self.storage_dir() else: - storage_path = module.__class__.__name__ - self.config.storage_path = storage_path + path = module.__class__.__name__ + self.config.path = path self.module = module - - def init_vali(self, config=None, module=None, score_fn=None, update=0, **kwargs): - # initialize the validator - # merge the config with the default config - if module != None: - self.set_module(module, **kwargs) - config = self.set_config(config, kwargs=kwargs) - config = c.dict2munch({**Vali.config(), **config}) - config.verbose = bool(config.verbose or config.debug) - self.config = config - if update: - self.config.max_staleness = 0 - - self.init_state() - self.set_score_fn(score_fn) - self.set_network(network=self.config.network, - search=config.search, - netuid=config.netuid, - max_age = config.max_network_staleness) - # start the run loop - if self.config.run_loop: - c.thread(self.run_loop) - - init = init_vali - @property def sent_staleness(self): - return c.time() - self.last_sent + return c.time() - self.state.last_sent @property def success_staleness(self): - return c.time() - self.last_success - - def epoch_info(self): - - return { - 'requests': self.requests, - 'errors': self.errors, - 'successes': self.successes, - 'sent_staleness': self.sent_staleness, - 'success_staleness': self.success_staleness, - 'staleness_count': self.staleness_count, - 'epochs': self.epochs, - 'executor_status': self.executor.status() - - - } - - def start_workers(self): - c.print('Starting workers', color='cyan') - for i in range(self.config.workers): - self.start_worker(i) - + return c.time() - self.state.last_success @property def lifetime(self): - return c.time() - self.start_time + return c.time() - self.state.start_time @property def is_voting_network(self): - return any([v in self.config.network for v in self.config.voting_networks]) + return any([v in self.config.network for v in self.voting_networks]) @property def last_start_staleness(self): @@ -132,19 +120,11 @@ def run_step(self): """ The following runs a step in the validation loop """ - - self.sync() - run_info = self.run_info() - should_start_workers = self.success_staleness > self.config.max_success_staleness and self.last_start_staleness > self.config.max_success_staleness - if should_start_workers: - c.print('Too many stale successes, restarting workers', color='red') - self.start_workers() + self.epoch() if self.is_voting_network and self.vote_staleness > self.config.vote_interval: - buffer = '='*10 - c.print(buffer+'Voting'+buffer, color='cyan') + c.print('Voting', color='cyan') c.print(self.vote()) - c.print(run_info) - c.print(buffer+f'Epoch {self.epochs} with {self.n} modules', color='yellow'+buffer) + c.print(f'Epoch {self.state.epochs} with {self.n} modules', color='yellow') c.print(self.leaderboard()) def run_loop(self): @@ -157,7 +137,6 @@ def run_loop(self): """ self.sync() # start the workers - self.start_workers() while True: c.sleep(self.config.run_step_interval) @@ -166,54 +145,10 @@ def run_loop(self): except Exception as e: c.print(c.detailed_error(e)) - def workers(self): - if self.config.mode == None or str(self.config.mode) == 'server': - return c.servers(search=self.server_name) - elif self.config.mode == 'thread': - return c.threads(search='worker') - else: - return [] - - - def start_worker(self, id = 0, **kwargs): - self.last_start_time = c.time() - worker_name = self.worker_name(id) - if self.config.mode == 'thread': - worker = c.thread(self.worker, kwargs=kwargs, name=worker_name) - elif self.config.mode == 'process': - worker = c.process(self.worker, kwargs=kwargs, name=worker_name) - elif self.config.mode == 'server': - kwargs['config'] = self.config - worker = self.serve(kwargs=kwargs, - key=self.server_name, - server_name = self.server_name + f'::{id}',) - else: - raise Exception(f'Invalid mode {self.config.mode}') - - c.print(f'Started worker {worker}', color='cyan') - - return {'success': True, 'msg': f'Started worker {worker}', 'worker': worker} - - def worker_name(self, id = 0): - return f'worker::{id}' - def age(self): - return c.time() - self.start_time - - - - def worker(self, - epochs=1e9): - for epoch in range(int(epochs)): - try: - result = self.epoch() - except Exception as e: - result = c.detailed_error(e) - c.print(f'Leaderboard epoch={self.epochs})' , color='yellow') + return c.time() - self.state.start_time - - - def generate_finished_result(self): + def wait_for_result(self): try: for future in c.as_completed(self.futures, timeout=self.config.timeout): self.futures.remove(future) @@ -224,7 +159,7 @@ def generate_finished_result(self): msg = ' '.join([f'{k}={result[k]}' for k in result.keys()]) msg = f'ERROR({msg})' else: - result = {k: result[k] for k in self.config.result_keys} + result = {k: result.get(k, None) for k in ['w', 'name', 'key', 'address'] if k in result} msg = ' '.join([f'{k}={result[k]}' for k in result]) msg = f'SUCCESS({msg})' break @@ -232,9 +167,10 @@ def generate_finished_result(self): emoji = '🔴' result = c.detailed_error(e) msg = f'Error({result})' + c.print(emoji + msg + emoji, color='cyan', - verbose=self.config.verbose) + verbose=True) return result @@ -249,56 +185,39 @@ def run_epoch(cls, network='local', vali=None, run_loop=False, update=1, **kwarg if vali != None: cls = c.module(vali) self = cls(network=network, run_loop=run_loop, update=update, **kwargs) - return self.epoch() - - def epoch(self, df=True, **kwargs): - - try: - self.epochs += 1 - self.sync_network(**kwargs) - module_addresses = c.shuffle(list(self.namespace.values())) - c.print(f'Epoch {self.epochs} with {len(module_addresses)} modules', color='yellow') - batch_size = min(self.config.batch_size, len(module_addresses)//4) - self.executor = c.module('executor.thread')(max_workers=self.config.threads_per_worker, maxsize=self.config.max_size) - - self.sync(network=self.config.network) - - results = [] - timeout = self.config.timeout - n = len(module_addresses) - self.current_epoch = self.epochs - c.print(f'Starting epoch {self.current_epoch} with n={n}',) - - for module_address in module_addresses: - if not self.executor.is_full: - future = self.executor.submit(self.eval, kwargs={'module': module_address},timeout=timeout) - self.futures.append(future) - if len(self.futures) >= batch_size: - result = self.generate_finished_result() - results.append(result) - - - while len(self.futures) > 0: - result = self.generate_finished_result() - results.append(result) - - except Exception as e: - c.print(c.detailed_error(e), color='red') + return self.epoch(df=1) + def epoch(self, df=True): + self.state.epochs += 1 + self.sync() + module_addresses = c.shuffle(list(self.namespace.values())) + c.print(f'Epoch {self.state.epochs} with {self.n} modules', color='yellow') + batch_size = min(self.config.batch_size, len(module_addresses)//4) + results = [] + timeout = self.config.timeout + self.current_epoch = self.state.epochs + for module_address in module_addresses: + if not self.executor.is_full: + future = self.executor.submit(self.eval, {'module': module_address},timeout=timeout) + self.futures.append(future) + if len(self.futures) >= batch_size: + results.append(self.wait_for_result()) + while len(self.futures) > 0: + results.append(self.wait_for_result()) results = [r for r in results if not c.is_error(r)] if df: if len(results) > 0 and 'w' in results[0]: results = c.df(results) results = results.sort_values(by='w', ascending=False) return results - - + + @property def network_staleness(self) -> int: """ The staleness of the network """ - return c.time() - self.last_sync_time + return c.time() - self.state.last_sync_time def filter_module(self, module:str, search=None): search = search or self.config.search @@ -309,92 +228,88 @@ def filter_module(self, module:str, search=None): return all([s == None or s in module for s in search_list ]) - def set_network(self, + def sync(self, network:str=None, - netuid:int=None,**kwargs): + netuid:int=None, + search = None, + max_network_staleness=None): + self.init_state() config = self.config - config.network = network or config.network - config.netuid = netuid or config.netuid - if len(kwargs) > 0: - config = c.dict2munch({**config, **kwargs}) - - if self.network_staleness < config.max_network_staleness: + network = network or config.network + netuid = netuid or config.netuid + search = search or config.search + max_network_staleness = max_network_staleness or config.max_network_staleness + if self.network_staleness < max_network_staleness: return {'msg': 'Alredy Synced network Within Interval', 'staleness': self.network_staleness, 'max_network_staleness': self.config.max_network_staleness, - 'network': self.config.network, - 'netuid': self.config.netuid, + 'network': network, + 'netuid': netuid, 'n': self.n, - 'search': self.config.search, + 'search': search, } - c.print(f'Network(network={config.network}, netuid={config.netuid} staleness={self.network_staleness})') - self.last_sync_time = c.time() - + + self.state.last_sync_time = c.time() # RESOLVE THE VOTING NETWORKS if 'local' in config.network: # local network does not need to be updated as it is atomically updated - namespace = c.module('namespace').namespace(search=config.search, update=False) - elif config.network in ['subspace', 'main', 'test']: - if '.' in config.network: - config.network, config.netuid = config.network.split('.') - # convert the subnet to a netuid - if isinstance(config.netuid, str): - config.netuid = self.subspace.subnet2netuid(config.netuid) - if '/' in config.network: - _ , config.network = config.network.split('/') - self.subspace = c.module('subspace')(network=config.network) - config.network = 'subspace' + '/' + str(self.subspace.network) - namespace = self.subspace.namespace(netuid=config.netuid, max_age=config.max_network_staleness) + namespace = c.namespace(search=config.search, update=1) + else: - raise Exception(f'Invalid network {config.network}') + for sep in [':', '/']: + if sep in config.network: + # subtensor{sep}test + if len(config.network.split(sep)) == 2: + _ , network = config.network.split(sep) + elif len(config.network.split(sep)) == 3: + _ , network, netuid = config.network.split(sep) + netuid = int(netuid) + break + + # the network is a voting network + self.subspace = c.module('subspace')(network=network, netuid=netuid) + namespace = self.subspace.namespace(netuid=config.netuid, update=1) + + + namespace = {k: v for k, v in namespace.items() if self.filter_module(k)} + self.name2address = {k:v for k, v in namespace.items()} + self.address2name = {v: k for k, v in namespace.items()} self.namespace = namespace - self.namespace = {k: v for k, v in namespace.items() if self.filter_module(k)} self.n = len(self.namespace) - self.name2address = self.namespace - self.address2name = {k:v for v, k in namespace.items()} - self.module2name = {v: k for k, v in self.namespace.items()} + config.network = network + config.netuid = netuid self.config = config - c.print(f'Synced network {config.network} with {self.n} modules', color='green') - return self.network_info() + c.print(f'Network(network={config.network}, netuid={config.netuid} staleness={self.network_staleness})') + self.state['network'] = { + 'network': network, + 'netuid': netuid, + 'n': self.n, + 'search': search, + 'namespace': namespace, + + } + return - sync = sync_network = set_network - - @property - def verbose(self): - return self.config.verbose or self.config.debug - - def score(self, module: 'c.Module'): + def score(self, module): # assert 'address' in info, f'Info must have a address key, got {info.keys()}' - info = module.info() - assert isinstance(info, dict) and not c.is_error(info), f'Info must be a dictionary, got {info}' - return {'w': 1} - + a = c.random_int() + b = c.random_int() + expected_output = b + module.put_item(str(a),b) + output = module.get_item(str(a)) + if output == expected_output: + return 1 + + return 0 def next_module(self): return c.choice(list(self.namespace.keys())) module2last_update = {} - def get_module_path(self, module): - if module in self.address2name: - module = self.address2name[module] - path = self.resolve_path(self.storage_path() +'/'+ module) - return path - - def resolve_module_address(self, module): - """ - reoslve the module address - """ - if module in self.name2address: - address = self.name2address[module] - else: - assert module in self.address2name, f"{module} is not found in {self.config.network}" - address = module - - return address - def check_info(self, info:dict) -> bool: - return bool(isinstance(info, dict) and all([k in info for k in self.config.expected_info_keys])) + expected_info_keys = ['w', 'address', 'name', 'key'] # the keys for the expected info function + return bool(isinstance(info, dict) and all([k in info for k in expected_info_keys])) @@ -402,19 +317,16 @@ def eval(self, module:str, **kwargs): """ The following evaluates a module sver """ - self.sync() - info = {} try: info = {} # RESOLVE THE NAME OF THE ADDRESS IF IT IS NOT A NAME - address = self.resolve_module_address(module) - path = self.get_module_path(module) - module = c.connect(address, key=self.key) + path = self.resolve_path(self.path +'/'+ module) + address = self.namespace.get(module, module) + module_client = c.connect(address, key=self.key) info = self.get_json(path, {}) last_timestamp = info.get('timestamp', 0) info['staleness'] = c.time() - last_timestamp if info['staleness'] < self.config.max_staleness: - self.staleness_count += 1 raise Exception({'module': info['name'], 'msg': 'Too New', 'staleness': info['staleness'], @@ -423,82 +335,60 @@ def eval(self, module:str, **kwargs): }) # is the info valid if not self.check_info(info): - info = module.info(timeout=self.config.timeout_info) + info = module_client.info(timeout=self.config.timeout_info) + self.state.last_sent = c.time() + self.state.requests += 1 info['timestamp'] = c.timestamp() # the timestamp info['w'] = info.get('w', 0) # the weight from the module info['past_w'] = info['w'] # for calculating alpha - info['path'] = path # path of saving the module - self.last_sent = c.time() - self.requests += 1 - response = self.score(module, **kwargs) - response = self.process_response(response=response, info=info) - except Exception as e: - response = c.detailed_error(e) - response['w'] = 0 - response['name'] = info.get('name', module) - self.errors += 1 - self.last_error = c.time() # the last time an error occured - return response + # SCORE + response = self.score(module_client, **kwargs) + + # PROCESS THE SCORE + if type(response) in [int, float, bool]: + # if the response is a number, we want to convert it to a dict + response = {'w': float(response)} + if not type(response['w']) in [int, float]: + raise f'Response weight must be a number, got {response["w"]} with result : {response}' + info.update(response) + info['latency'] = c.round(c.time() - info['timestamp'], 3) + info['w'] = info['w'] * self.config.alpha + info['past_w'] * (1 - self.config.alpha) + info['history'] = info.get('history', []) + [{'w': info['w'], 'timestamp': info['timestamp']}] + # have a minimum weight to save storage of stale modules + self.state.successes += 1 + self.state.last_success = c.time() + info['staleness'] = c.round(c.time() - info.get('timestamp', 0), 3) + + if response['w'] > self.config.min_leaderboard_weight: + self.put_json(path, info) + return info - def check_response(self, response): - if type(response) in [int, float, bool]: - # if the response is a number, we want to convert it to a dict - response = {'w': float(response)} - elif type(response) == dict: - response = response - else: - raise Exception(f'Response must be a number or a boolean, got {response}') - - if not type(response['w']) in [int, float]: - raise f'Response weight must be a number, got {response["w"]} with result : {response}' - - def process_response(self, response:dict, info:dict ): - """ - Process the response from the score_module - params: - response - """ - self.check_response(response) - info.update(response) - info['latency'] = c.round(c.time() - info['timestamp'], 3) - info['w'] = info['w'] * self.config.alpha + info['past_w'] * (1 - self.config.alpha) - info['history'] = info.get('history', []) + [{'w': info['w'], 'timestamp': info['timestamp']}] - # have a minimum weight to save storage of stale modules - if info['w'] > self.config.min_leaderboard_weight: - self.put_json(info['path'], info) - self.successes += 1 - self.last_success = c.time() - info['staleness'] = c.round(c.time() - info.get('timestamp', 0), 3) - return info - + except Exception as e: + info = c.detailed_error(e) + info['w'] = 0 + info['name'] = info.get('name', module) + self.errors += 1 + self.state.last_error = c.time() # the last time an error occured + return response eval_module = eval - - def default_storage_path(self): - network = self.config.network - if 'subspace' in network: - path = f'{network}.{self.config.netuid}' - else: - path = network - return path - - def storage_path(self): + + @property + def path(self): # the set storage path in config.path is set, then the modules are saved in that directory - storage_path = self.config.get('storage_path', self.default_storage_path()) - storage_path = self.resolve_path(storage_path) - self.config.storage_path = storage_path - return self.config.storage_path + default_path = f'{self.config.network}.{self.config.netuid}' if self.is_voting_network else self.config.network + self.config.path = self.resolve_path(self.config.get('path', default_path)) + return self.config.path - def vote_info(self): try: if not self.is_voting_network: return {'success': False, 'msg': 'Not a voting network' , 'network': self.config.network , - 'voting_networks': self.voting_networks ,} + 'voting_networks': self.voting_networks} votes = self.calculate_votes() except Exception as e: votes = {'uids': [], 'weights': []} @@ -530,7 +420,7 @@ def calculate_votes(self, **kwargs): @property def votes_path(self): - return self.storage_path() + f'/votes' + return self.path + f'/votes' def vote(self,**kwargs): votes =self.calculate_votes() @@ -539,7 +429,6 @@ def vote(self,**kwargs): key=self.key, network=self.config.network, netuid=self.config.netuid, - **kwargs ) set_weights = vote @@ -551,9 +440,8 @@ def module_info(self, **kwargs): return {} def leaderboard(self, - keys = ['name', 'w', 'staleness', 'latency', 'address', 'staleness'], + keys = ['name', 'w', 'staleness', 'latency', 'address', 'staleness', 'key'], max_age = None, - network = None, ascending = True, by = 'w', to_dict = False, @@ -561,15 +449,15 @@ def leaderboard(self, page = None, **kwargs ): - max_age = max_age or self.config.max_leaderboard_age - paths = self.module_paths() + max_age = max_age or self.config.epoch_time + paths = self.paths() df = [] # chunk the jobs into batches for path in paths: r = self.get(path, {}, max_age=max_age) if isinstance(r, dict) and 'key' and r.get('w', 0) > self.config.min_leaderboard_weight : r['staleness'] = c.time() - r.get('timestamp', 0) - if not self.filter_module(r['name']): + if not self.filter_module(r.get('name', None)): continue df += [{k: r.get(k, None) for k in keys}] else : @@ -583,7 +471,6 @@ def leaderboard(self, if isinstance(by, str): by = [by] df = df.sort_values(by=by, ascending=ascending) - if n != None: if page != None: df = df[page*n:(page+1)*n] @@ -596,35 +483,22 @@ def leaderboard(self, return df - - - df = l = leaderboard - - def module_paths(self): - paths = self.ls(self.storage_path()) + def paths(self): + paths = self.ls(self.path) return paths def refresh_leaderboard(self): - storage_path = self.storage_path() - r = self.rm(storage_path) + path = self.path + r = self.rm(path) df = self.leaderboard() assert len(df) == 0, f'Leaderboard not removed {df}' - return {'success': True, 'msg': 'Leaderboard removed', 'path': storage_path} + return {'success': True, 'msg': 'Leaderboard removed', 'path': path} + + refresh = refresh_leaderboard def save_module_info(self, k:str, v:dict,): - path = self.storage_path() + f'/{k}' + path = self.path + f'/{k}' self.put(path, v) - - - def __del__(self): - workers = self.workers() - futures = [] - if self.config.mode == 'thread': - return [c.thread_map[w].cancel() for w in workers] - elif self.config.mode == 'server': - futures = [c.submit(c.kill, args=[w]) for w in workers] - return c.wait(futures, timeout=10) - @property def vote_staleness(self): @@ -636,31 +510,11 @@ def vote_staleness(self): return 0 - def network_info(self): - return { - 'search': self.config.search, - 'network': self.config.network, - 'netuid': self.config.netuid, - 'n': self.n, - 'staleness': self.network_staleness, - - } - - def run_info(self): - return { - 'network': self.network_info(), - 'epoch': self.epoch_info() , - 'vote': self.vote_info(), - - - } - - @classmethod def from_module(cls, module, config=None, - functions = ['eval_module', 'score_module', 'eval', 'leaderboard', 'run_epoch'], + functions = ['eval_module', 'score', 'eval', 'leaderboard', 'run_epoch'], **kwargs): module = c.resolve_module(module) @@ -669,7 +523,6 @@ def from_module(cls, setattr(module, fn, getattr(vali, fn)) return module - @classmethod def from_function(cls, function, @@ -678,13 +531,4 @@ def from_function(cls, setattr(vali, 'score_module', function) return vali - def print_header(self): - - module_path = self.module_name() - buffer='='*40 - c.print(buffer*2) - c.print(buffer + module_path + ' ' + buffer[len(module_path):]) - c.print(buffer*2) - - Vali.run(__name__) diff --git a/commune/vali/vali.yaml b/commune/vali/vali.yaml deleted file mode 100644 index 099827f0b..000000000 --- a/commune/vali/vali.yaml +++ /dev/null @@ -1,40 +0,0 @@ -# NETWORK CONFIGURATION -network: local # for local subspace:test or test # for testnet subspace:main or main # for mainnet -netuid: 0 # (NOT LOCAL) the subnetwork uid or the netuid. This is a unique identifier for the subnetwork -subnet : null # (OPTIONAL) the subnet name which overrides the netuid if it is not null -search: null # (OPTIONAL) the search string for the network -max_network_staleness: 60 # the maximum staleness of the network -# LOGGING CONFIGURATION -verbose: true # the verbose mode for the worker -debug : false # this is also the same as verbose (we need to change this to true) - -# WORKER EPOCH CONFIGURATION -max_size: 128 -threads_per_worker: 64 -mode: thread -search: null -batch_size: 64 -workers: 1 # the number of workers - -# MODULE EVAL CONFIGURATION -storage_path: null # the storage path for the module eval, if not null then the module eval is stored in this directory -alpha: 1.0 # -timeout: 10 # timeout per evaluation of the module -timeout_info: 4 # (OPTIONAL) the timeout for the info worker -score_fns : ['score_module', 'score', 'reward'] # the score functions -max_staleness: 60 # the maximum staleness of the worker -max_success_staleness: 100 # the maximum staleness of the worker -result_keys : ['w', 'address', 'name', 'key', 'latency', 'staleness'] # the keys for the module eval -expected_info_keys : ['w', 'address', 'name', 'key'] # the keys for the expected info function - -# LEADERBOARD CONFIGURATION -max_leaderboard_age: 3600 # the maximum age of the leaderboard befor it is refreshed -min_leaderboard_weight: 0 # the minimum weight of the leaderboard - - -# RUN LOOP CONFIGURATION for background loop -run_step_interval: 3 # the interval for the run loop to run -run_loop: true # This is the key that we need to change to false -vote_interval: 100 # the number of iterations to wait before voting - -