From bd02930d73f0847a83775aceca9b6c81b09682fa Mon Sep 17 00:00:00 2001 From: latentvector Date: Sat, 8 Jun 2024 17:28:01 -0400 Subject: [PATCH] refactor and test --- commune/cli.py | 48 ++++---- commune/module/module.py | 109 ++++++----------- commune/server/access.py | 38 +++--- commune/server/test.py | 8 +- commune/subspace/network.py | 58 ++++----- commune/subspace/subspace.py | 178 +++++++++------------------- commune/subspace/test.py | 8 +- commune/tree/tree.py | 12 +- commune/vali/vali.py | 65 +++++----- commune/vali/vali.yaml | 1 + docs/{module => basics}/1_module.md | 0 docs/{module => basics}/2_cli.md | 0 docs/{module => basics}/3_server.md | 0 docs/{module => basics}/4_key.md | 0 docs/{module => basics}/5_app.md | 0 docs/{module => basics}/6_tree.md | 0 16 files changed, 223 insertions(+), 302 deletions(-) rename docs/{module => basics}/1_module.md (100%) rename docs/{module => basics}/2_cli.md (100%) rename docs/{module => basics}/3_server.md (100%) rename docs/{module => basics}/4_key.md (100%) rename docs/{module => basics}/5_app.md (100%) rename docs/{module => basics}/6_tree.md (100%) diff --git a/commune/cli.py b/commune/cli.py index 720f13584..ea7ad0f1f 100644 --- a/commune/cli.py +++ b/commune/cli.py @@ -12,39 +12,44 @@ def __init__(self, args = None, module = 'module', verbose = True, + history_module = 'history', + path = 'history', save: bool = True): + self.verbose = verbose + self.save = save + self.history_module = c.module(history_module)(folder_path=self.resolve_path(path)) self.base_module = c.module(module) + self.base_module_attributes = list(set(self.base_module.functions() + self.base_module.get_attributes())) args = args or self.argv() + self.input_str = 'c ' + ' '.join(args) output = self.get_output(args) + self.process_output(output) + def process_output(self, output): if c.is_generator(output): for output_item in output: if isinstance(c, Munch): output_item = output_item.toDict() - c.print(output_item, verbose=verbose) + c.print(output_item, verbose=self.verbose) else: if isinstance(output, Munch): output = output.toDict() - c.print(output, verbose=verbose) + c.print(output, verbose=self.verbose) - if save and c.jsonable(output): - self.history_module().add({'input': 'c ' + ' '.join(args), 'output': output}) + if self.save and c.jsonable(output): + self.history_module.add({'input': self.input_str, 'output': output}) + return output - def get_output(self, args): - args, kwargs = self.parse_args(args) - - base_module_attributes = list(set(self.base_module.functions() + self.base_module.get_attributes())) - # is it a fucntion, assume it is for the module - # handle module/function - is_fn = args[0] in base_module_attributes + def get_output(self, args): + is_fn = args[0] in self.base_module_attributes if '/' in args[0]: args = args[0].split('/') + args[1:] is_fn = False - + if is_fn: # is a function module = self.base_module @@ -54,21 +59,19 @@ def get_output(self, args): if isinstance(module, str): module = c.module(module) fn = args.pop(0) - - if module.classify_fn(fn) == 'self': - module = module() - + module = module() fn_obj = getattr(module, fn) + args, kwargs = self.parse_args(args) + + if callable(fn_obj): output = fn_obj(*args, **kwargs) elif c.is_property(fn_obj): output = getattr(module(), fn) else: output = fn_obj - if callable(fn): - output = fn(*args, **kwargs) return output @@ -78,7 +81,6 @@ def get_output(self, args): def parse_args(cls, argv = None): if argv is None: argv = cls.argv() - args = [] kwargs = {} parsing_kwargs = False @@ -89,13 +91,13 @@ def parse_args(cls, argv = None): # args.append(cls.determine_type(arg)) if '=' in arg: parsing_kwargs = True - key, value = arg.split('=', 1) + key, value = arg.split('=') # use determine_type to convert the value to its actual type kwargs[key] = cls.determine_type(value) + else: assert parsing_kwargs is False, 'Cannot mix positional and keyword arguments' args.append(cls.determine_type(arg)) - return args, kwargs @classmethod @@ -144,10 +146,6 @@ def determine_type(cls, x): return x - @classmethod - def history_module(cls, path='history'): - return c.m('history')(folder_path=cls.resolve_path(path)) - @classmethod def history(cls,**kwargs): history = cls.history_module().history(**kwargs) diff --git a/commune/module/module.py b/commune/module/module.py index 45bd0deb3..6b375f750 100755 --- a/commune/module/module.py +++ b/commune/module/module.py @@ -1836,7 +1836,7 @@ def rm(cls, path, extension=None, mode = 'json'): return {'success':False, 'message':f'{path} does not exist'} if os.path.isdir(path): c.rmdir(path) - else: + if os.path.isfile(path): os.remove(path) assert not os.path.exists(path), f'{path} was not removed' @@ -3001,8 +3001,9 @@ def memory_usage(fmt='gb'): return (process.memory_info().rss // 1024) / scale @classmethod - def argparse(cls, verbose: bool = False, version=1): - if version == 1: + def argparse(cls, verbose: bool = False, **kwargs): + argv = ' '.join(c.argv()) + if ' --' in argv or ' -' in argv: parser = argparse.ArgumentParser(description='Argparse for the module') parser.add_argument('-fn', '--fn', dest='function', help='The function of the key', type=str, default="__init__") parser.add_argument('-kwargs', '--kwargs', dest='kwargs', help='key word arguments to the function', type=str, default="{}") @@ -3020,13 +3021,17 @@ def argparse(cls, verbose: bool = False, version=1): if len(args.params) > len(args.kwargs): args.kwargs = args.params args.args = json.loads(args.args.replace("'",'"')) - elif version == 2: - args = c.parseargs() - + else: + args = c.parse_args() return args @classmethod - def run(cls, name:str = None, verbose:bool = False, version=1) -> Any: + def parse_args(cls, argv = None, **kwargs): + return c.module('cli').parse_args(argv=argv) + + @classmethod + def run(cls, name:str = None, + version=1) -> Any: is_main = name == '__main__' or name == None or name == cls.__name__ if not is_main: return {'success':False, 'message':f'Not main module {name}'} @@ -3605,11 +3610,13 @@ def test_fns(cls, *args, **kwargs): @classmethod def test(cls, module=None, - timeout=60, + timeout=70, trials=3, parallel=True, ): module = module or cls.module_path() + if module == 'module': + return c.cmd('pytest commune', verbose=True) if c.module_exists(module + '.test'): c.print('FOUND TEST MODULE', color='yellow') module = module + '.test' @@ -4700,74 +4707,32 @@ def find_lines(self, text:str, search:str) -> List[str]: @classmethod def new_module( cls, module : str , - repo : str = None, - base_module : str = 'demo', - tree : bool = 'commune', - overwrite : bool = True, - **kwargs): + base_module : str = 'demo', + folder_module : bool = False, + update=1 + ): - """ Makes directories for path. - """ - if module == None: - assert repo != None, 'repo must be specified if module is not specified' - module = os.path.basename(repo).replace('.git','').replace(' ','_').replace('-','_').lower() - tree_path = c.tree2path().get(tree) - - class_name = '' - for m in module.split('.'): - class_name += m[0].upper() + m[1:] # capitalize first letter - - if c.module_exists(module): - if overwrite: - module_path = c.module(module).dirpath() if c.is_file_module(module) else c.module(module).filepath() - c.rm(module_path) - else: - return {'success': False, - 'path': module_path, - 'msg': f' module {module} already exists, set overwrite=True to overwrite'} - - # get the code ready from the base module - c.print(f'Getting {base_module}') base_module = c.module(base_module) - is_folder_module = base_module.is_folder_module() - - base_module_class = base_module.class_name() - module_class_name = ''.join([m[0].upper() + m[1:] for m in module.split('.')]) - - # build the path2text dictionary - if is_folder_module: - dirpath = tree_path + '/'+ module.replace('.','/') + '/' - base_dirpath = base_module.dirpath() - path2text = c.path2text( base_module.dirpath()) - path2text = {k.replace(base_dirpath +'/',dirpath ):v for k,v in path2text.items()} - else: - module_path = tree_path + '/'+ module.replace('.','/') + '.py' - code = base_module.code() - path2text = {module_path: code} - - og_path2text = c.copy(path2text) - for path, text in og_path2text.items(): - file_type = path.split('.')[-1] - is_module_python_file = (file_type == 'py' and 'class ' + base_module_class in text) - - if is_folder_module: - if file_type == 'yaml' or is_module_python_file: - path_filename = path.split('/')[-1] - new_filename = module.replace('.', '_') + '.'+ file_type - path = path[:-len(path_filename)] + new_filename - - - if is_module_python_file: - text = text.replace(base_module_class, module_class_name) - - path2text[path] = text - c.put_text(path, text) - c.print(f'Created {path} :: {module}') - - assert c.module_exists(module), f'Failed to create module {module}' + module_class_name = ''.join([m[0].capitalize() + m[1:] for m in module.split('.')]) + base_module_class_name = base_module.class_name() + base_module_code = base_module.code().replace(base_module_class_name, module_class_name) + pwd = c.pwd() + path = os.path.join(pwd, module.replace('.', '/')) + if folder_module: + dirpath = path + filename = module.replace('.', '_') + path = os.path.join(path, filename) + path = path + '.py' + dirpath = os.path.dirname(path) + if os.path.exists(path) and not update: + return {'success': True, 'msg': f'Module {module} already exists', 'path': path} + if not os.path.exists(dirpath): + os.makedirs(dirpath, exist_ok=True) - return {'success': True, 'msg': f'Created module {module}', 'path': path, 'paths': list(c.path2text(c.module(module).dirpath()).keys())} + c.put_text(path, base_module_code) + + return {'success': True, 'msg': f'Created module {module}', 'path': path} add_module = new_module diff --git a/commune/server/access.py b/commune/server/access.py index 099ff923f..8d1bbaf1a 100644 --- a/commune/server/access.py +++ b/commune/server/access.py @@ -19,13 +19,13 @@ def __init__(self, refresh: bool = False, stake_from_weight = 1.0, # the weight of the staker max_age = 30, # max age of the state in seconds - sync_interval: int = 60, # 1000 seconds per sync with the network + max_staleness: int = 60, # 1000 seconds per sync with the network **kwargs): self.set_config(locals()) self.user_module = c.module("user")() - self.state_path = state_path + self.state_path = self.resolve_path(state_path) if refresh: self.rm_state() self.last_time_synced = c.time() @@ -35,7 +35,6 @@ def __init__(self, 'fn_info': {}} self.set_module(module) - c.thread(self.run_loop) def set_module(self, module): @@ -52,30 +51,35 @@ def run_loop(self): except Exception as e: r = c.detailed_error(e) c.print(r) - c.sleep(self.config.sync_interval) + c.sleep(self.config.max_staleness) - def sync_network(self, update=False, max_age=None): - state = self.get(self.state_path, {}, max_age=self.config.sync_interval) - time_since_sync = c.time() - state.get('sync_time', 0) + def sync_network(self, update=False, max_age=None, netuid=None, network=None): + state = self.get(self.state_path, {}, max_age=self.config.max_staleness) + netuid = netuid or self.config.netuid + network = network or self.config.network + staleness = c.time() - state.get('sync_time', 0) self.key2address = c.key2address() self.address2key = c.address2key() - response = {'msg': f'synced {self.state_path}', - 'until_sync': int(self.config.sync_interval - time_since_sync), - 'time_since_sync': int(time_since_sync)} + response = { + 'path': self.state_path, + 'max_staleness': self.config.max_staleness, + 'network': network, + 'netuid': netuid, + 'staleness': int(staleness), + 'datetime': c.datetime()} - if time_since_sync < self.config.sync_interval: + if staleness < self.config.max_staleness: response['msg'] = 'synced too earlly' return response - - self.subspace = c.module('subspace')(network=self.config.network) + else: + response['msg'] = 'Synced with the network' + response['staleness'] = 0 + self.subspace = c.module('subspace')(network=network) max_age = max_age or self.config.max_age - state['stakes'] = self.subspace.stakes(fmt='j', netuid=self.config.netuid, update=update, max_age=max_age) + state['stakes'] = self.subspace.stakes(fmt='j', netuid=netuid, update=update, max_age=max_age) self.state = state self.put(self.state_path, self.state) - c.print(f'🔄 Synced {self.state_path} at {c.datetime()} 🔄\033', color='yellow') - - return response def forward(self, fn: str = 'info' , input:dict = None, address=None) -> dict: diff --git a/commune/server/test.py b/commune/server/test.py index 1d6ddd3d6..0d9023ce7 100644 --- a/commune/server/test.py +++ b/commune/server/test.py @@ -18,13 +18,13 @@ def test_basics(cls) -> dict: @classmethod - def test_serving(cls): - server_name = 'module::test' + def test_serving(cls, server_name = 'module::test'): + if server_name in c.servers(): + c.kill(server_name) module = c.serve(server_name) c.wait_for_server(server_name) module = c.connect(server_name) - - module.put("hey",1) + r = module.put("hey",1) v = module.get("hey") assert v == 1, f"get failed {v}" c.kill(server_name) diff --git a/commune/subspace/network.py b/commune/subspace/network.py index a748f53a5..a4006c3e9 100644 --- a/commune/subspace/network.py +++ b/commune/subspace/network.py @@ -87,37 +87,38 @@ def get_substrate(self, ''' substrate = None - url = self.resolve_url(url, mode=mode) - self.url = url - self.network = network - if cache: - if url in self.url2substrate: - substrate = self.url2substrate[url] - - if substrate == None: - while trials > 0: - try: - - substrate= SubstrateInterface(url=url, - websocket=websocket, - ss58_format=ss58_format, - type_registry=type_registry, - type_registry_preset=type_registry_preset, - cache_region=cache_region, - runtime_config=runtime_config, - ws_options=ws_options, - auto_discover=auto_discover, - auto_reconnect=auto_reconnect) + + + while trials > 0: + try: + url = self.resolve_url(url, mode=mode) + if cache: + if url in self.url2substrate: + substrate = self.url2substrate[url] + if substrate != None: break - except Exception as e: - trials = trials - 1 - if trials > 0: - raise e - - if cache: - self.url2substrate[url] = substrate + substrate= SubstrateInterface(url=url, + websocket=websocket, + ss58_format=ss58_format, + type_registry=type_registry, + type_registry_preset=type_registry_preset, + cache_region=cache_region, + runtime_config=runtime_config, + ws_options=ws_options, + auto_discover=auto_discover, + auto_reconnect=auto_reconnect) + if cache: + self.url2substrate[url] = substrate + break + except Exception as e: + trials = trials - 1 + if trials > 0: + raise e + + self.substrate = substrate + self.url = url return substrate @@ -139,7 +140,6 @@ def query(self, module:str='SubspaceModule', block=None, netuid = None, - network: str = network, save= True, max_age=1000, mode = 'http', diff --git a/commune/subspace/subspace.py b/commune/subspace/subspace.py index 4498e25c5..c6aa967e9 100644 --- a/commune/subspace/subspace.py +++ b/commune/subspace/subspace.py @@ -41,13 +41,15 @@ def __init__( def resolve_url(self, url = None, mode='http', **kwargs): + + network = self.resolve_network() def is_match(x): url_search_terms = [x.strip() for x in self.config.url_search.split(',')] return any([url in x for url in url_search_terms]) mode = mode or self.config.network_mode assert mode in ['http', 'ws', 'wss', 'https'] if url == None: - urls_map = getattr(self.config.urls, self.config.network) + urls_map = getattr(self.config.urls, network) urls = [] for provider, mode2url in urls_map.items(): if is_match(provider): @@ -60,7 +62,7 @@ def is_match(x): @property def substrate(self): if self._substrate == None: - self._substrate = self.get_substrate() + self.set_network() return self._substrate @substrate.setter @@ -69,7 +71,6 @@ def substrate(self, value): url2substrate = {} def get_substrate(self, - network:str = None, url : str = None, websocket:str=None, ss58_format:int=42, @@ -144,18 +145,24 @@ def set_network(self, trials = 10, url : str = None, **kwargs): - network = self.resolve_network(network) - self.network = self.config.network + self.network = self.resolve_network(network) self.substrate = self.get_substrate( url=url, mode=mode, trials=trials , **kwargs) response = {'network': self.network, 'url': self.url} return response + @property + def network(self): + return self.resolve_network(self.config.network) + + @network.setter + def network(self, value): + self.config.network = value def __repr__(self) -> str: - return f'' + return f'' def __str__(self) -> str: - return f'' + return f'' @@ -398,7 +405,7 @@ def query_map(self, name: str = 'StakeFrom', page_size=1000, max_results=100000, module='SubspaceModule', - update: bool = True, + update: bool = False, max_age : str = 1000, # max age in seconds mode = 'ws', trials = 4, @@ -420,7 +427,7 @@ def query_map(self, name: str = 'StakeFrom', if len(params) > 0 : path = path + f'::params::' + '-'.join([str(p) for p in params]) - value = self.get(path, None , max_age=max_age) + value = self.get(path, None , max_age=max_age, update=update) if value == None: # block = block or self.block @@ -493,7 +500,7 @@ def runtime_spec_version(self, network:str = 'main'): ##################################### """ Returns network SubnetN hyper parameter """ - def n(self, netuid: int = 0, network = 'main' ,block: Optional[int] = None, max_age=100, update=False, **kwargs ) -> int: + def n(self, netuid: int = 0,block: Optional[int] = None, max_age=100, update=False, **kwargs ) -> int: if netuid == 'all': return sum(self.query_map('N', block=block , update=update, max_age=max_age, **kwargs).values()) else: @@ -503,7 +510,7 @@ def n(self, netuid: int = 0, network = 'main' ,block: Optional[int] = None, max #### Account functions ### """ Returns network Tempo hyper parameter """ - def stakes(self, netuid: int = 0, block: Optional[int] = None, fmt:str='nano', max_age = 100, update=False, **kwargs) -> int: + def stakes(self, netuid: int = 0, fmt:str='nano', max_age = 100, update=False, **kwargs) -> int: stakes = self.query_map('Stake', netuid=netuid, update=update, max_age=max_age, **kwargs) if netuid == 'all': subnet2stakes = c.copy(stakes) @@ -774,15 +781,7 @@ def block(self) -> int: - - @classmethod - def archived_blocks(cls, network:str=network, reverse:bool = True) -> List[int]: - # returns a list of archived blocks - - blocks = [f.split('.B')[-1].split('.json')[0] for f in cls.glob(f'archive/{network}/state.B*')] - blocks = [int(b) for b in blocks] - sorted_blocks = sorted(blocks, reverse=reverse) - return sorted_blocks + def subnet_exists(self, subnet:str) -> bool: subnets = self.subnets() @@ -981,7 +980,6 @@ def blocks_until_vote(self, netuid=0, **kwargs): def subnet_params(self, netuid=0, - network = 'main', update = False, max_age = 1000, fmt:str='j', @@ -993,7 +991,7 @@ def subnet_params(self, features = features or self.config.subnet_features netuid = self.resolve_netuid(netuid) - path = f'query/{network}/SubspaceModule.SubnetParams.{netuid}' + path = f'query/{self.network}/SubspaceModule.SubnetParams.{netuid}' subnet_params = self.get(path, None, max_age=max_age, update=update) names = [self.feature2name(f) for f in features] name2feature = dict(zip(names, features)) @@ -1072,12 +1070,9 @@ def age(self, netuid: int = None) -> Optional[float]: def global_params(self, - network = 'main', - block : Optional[int] = None, update = False, max_age = 100000, fmt:str='j', - rows:bool = True, value_features = [ 'min_burn', 'unit_emission', 'min_weight_stake'], features = None @@ -1085,7 +1080,7 @@ def global_params(self, features = features or self.config.global_features - path = f'query/{network}/SubspaceModule.GlobalParams' + path = f'query/{self.network}/SubspaceModule.GlobalParams' subnet_params = self.get(path, None, max_age=max_age, update=update) names = [self.feature2name(f) for f in features] name2feature = dict(zip(names, features)) @@ -1160,18 +1155,19 @@ def balances(self,fmt:str = 'n', network:str = network, block: int = None, n = N def resolve_network(self, - network: Optional[int] = 'subspace:main', + network: Optional[int] = None, spliters: List[str] = [ '::', ':'], **kwargs) -> int: """ Resolve the network to use for the current session. """ + network = network or self.config.network + for spliter in spliters: if spliter in str(network): network = network.split(spliter)[-1] break - network = network or self.config.network if network == 'subspace': network = 'main' return network @@ -1281,10 +1277,7 @@ def name2uid(self, name = None, netuid: int = 0, search=None, network: str = 'ma return name2uid - @classmethod - def get_feature(cls, feature='names', network='main', netuid=0, update=False, max_age=1000, **kwargs): - return getattr(cls(network=network), feature)(netuid=netuid, update=update, max_age=max_age, **kwargs) - + def name2key(self, name:str=None, max_age=1000, timeout=30, @@ -1338,8 +1331,7 @@ def emission_per_epoch(self, netuid=None): return self.subnet(netuid=netuid)['emission']*self.epoch_time(netuid=netuid) def get_block(self, block_hash=None, max_age=8): - network = network or 'main' - path = f'cache/{network}.block' + path = f'cache/{self.network}.block' block = self.get(path, None, max_age=max_age) if block == None: block_header = self.substrate.get_block( block_hash=block_hash)['header'] @@ -1372,7 +1364,6 @@ def get_module(self, **kwargs ) -> 'ModuleInfo': url = self.resolve_url( mode=mode) - c.print(url) module_key = module if not c.valid_ss58_address(module): module_key = self.name2key(name=module, netuid=netuid, **kwargs) @@ -1561,7 +1552,7 @@ def modules(self, netuid = self.resolve_netuid(netuid or subnet) state = {} - path = f'query/{network}/SubspaceModule.Modules:{netuid}' + path = f'query/{self.network}/SubspaceModule.Modules:{netuid}' modules = self.get(path, None, max_age=max_age) if modules == None: @@ -1765,17 +1756,17 @@ def namespace(self, search=None, netuid: int = 0, update:bool = False, timeout=3 return namespace - def weights(self, netuid = 0, network = 'main', update=False, **kwargs) -> list: - weights = self.query_map('Weights',netuid=netuid, network = network, update=update, **kwargs) + def weights(self, netuid = 0, update=False, **kwargs) -> list: + weights = self.query_map('Weights',netuid=netuid, update=update, **kwargs) return weights - def proposals(self, netuid = netuid, block=None, nonzero:bool=False, update:bool = False, **kwargs): - proposals = [v for v in self.query_map('Proposals', network = 'main', block=block, update=update, **kwargs)] + def proposals(self, block=None, nonzero:bool=False, update:bool = False, **kwargs): + proposals = [v for v in self.query_map('Proposals', block=block, update=update, **kwargs)] return proposals - def save_weights(self, nonzero:bool = False, network = "main",**kwargs) -> list: - self.query_map('Weights',network = 'main', update=True, **kwargs) + def save_weights(self, **kwargs) -> list: + self.query_map('Weights', update=True, **kwargs) return {'success': True, 'msg': 'Saved weights'} def pending_deregistrations(self, netuid = 0, update=False, **kwargs): @@ -1786,7 +1777,7 @@ def num_pending_deregistrations(self, netuid = 0, **kwargs): pending_deregistrations = self.pending_deregistrations(netuid=netuid, **kwargs) return len(pending_deregistrations) - def emissions(self, netuid = 0, network = "main", block=None, update=False, **kwargs): + def emissions(self, netuid = 0, block=None, update=False, **kwargs): return self.query_vector('Emission', netuid=netuid, block=block, update=update, **kwargs) @@ -1795,7 +1786,6 @@ def emissions(self, netuid = 0, network = "main", block=None, update=False, **kw def incentives(self, netuid = 0, block=None, - network = "main", update:bool = False, **kwargs): return self.query_vector('Incentive', netuid=netuid, block=block, update=update, **kwargs) @@ -1804,7 +1794,6 @@ def incentives(self, def trust(self, netuid = 0, block=None, - network = "main", update:bool = False, **kwargs): return self.query_vector('Trust', netuid=netuid, block=block, update=update, **kwargs) @@ -1823,7 +1812,7 @@ def query_vector(self, name='Trust', netuid = 0, update=False, **kwargs): def last_update(self, netuid = 0, update=False, **kwargs): return self.query_vector('LastUpdate', netuid=netuid, update=update, **kwargs) - def dividends(self, netuid = 0, network = 'main', update=False, **kwargs): + def dividends(self, netuid = 0, update=False, **kwargs): return self.query_vector('Dividends', netuid=netuid, update=update, **kwargs) @@ -1838,10 +1827,11 @@ def registration_block(self, netuid: int = 0, update=False, **kwargs): def stake_from(self, netuid = 0, block=None, update=False, - - fmt='nano', **kwargs) -> List[Dict[str, Union[str, int]]]: + max_age=10000, + fmt='nano', + **kwargs) -> List[Dict[str, Union[str, int]]]: - stake_from = self.query_map('StakeFrom', netuid=netuid, block=block, update=update, **kwargs) + stake_from = self.query_map('StakeFrom', netuid=netuid, block=block, update=update, max_age=max_age, **kwargs) format_tuples = lambda x: [[_k, self.format_amount(_v, fmt=fmt)] for _k,_v in x] if netuid == 'all': stake_from = {netuid: {k: format_tuples(v) for k,v in stake_from[netuid].items()} for netuid in stake_from} @@ -2005,7 +1995,7 @@ def archive_history(cls, update=True, **kwargs): - path = f'history/{network}.{netuid}.json' + path = f'history/{self.network}.{netuid}.json' archive_history = [] if not update: @@ -2045,7 +2035,6 @@ def check(cls, netuid=0): def stats(self, search = None, netuid=0, - network = network, df:bool=True, update:bool = False , features : list = ['name', 'emission','incentive', 'dividends', 'stake', 'vote_staleness', 'serving', 'address'], @@ -2151,7 +2140,7 @@ def state_dict(self , block = block or self.block - path = f'state_dict/{network}.block-{block}-time-{int(c.time())}' + path = f'state_dict/{self.network}.block-{block}-time-{int(c.time())}' feature2params = {} @@ -2241,22 +2230,6 @@ def sand(cls): state = c.get(v) c.print(k, state['balances'].get(addy, 0)) - - def test_balance(self, n:int = 10, timeout:int = 10, verbose:bool = False, min_amount = 10, key=None): - key = c.get_key(key) - - balance = self.get_balance() - assert balance > 0, f'balance must be greater than 0, not {balance}' - balance = int(balance * 0.5) - c.print(f'testing network {network} with {n} transfers of {balance} each') - - - @classmethod - def fix(cls): - avoid_ports = [] - free_ports = c.free_ports(n=3, avoid_ports=avoid_ports) - avoid_ports += free_ports - def num_holders(self, **kwargs): balances = self.balances(**kwargs) return len(balances) @@ -2265,8 +2238,6 @@ def total_balance(self, **kwargs): balances = self.balances(**kwargs) return sum(balances.values()) - - """ WALLET VIBES @@ -2888,21 +2859,7 @@ def unstake_many( self, response = self.compose_call('remove_stake_multiple', params=params, key=key) return response - - def unstake2key( self, - modules = 'all', - netuid = 0, - network = network, - to = None): - if modules == 'all': - modules = self.my_modules() - else: - assert isinstance(modules, list), f"Modules must be a list of module names" - for m in modules: - assert m in self.my_modules_names(), f"Module {m} not found in your modules" - modules = [m for m in self.my_modules() if m['name'] in modules or m['key'] in modules] - c.print(f'Unstaking {len(modules)} modules') - + def unstake_all( self, key: str = None, netuid = 0, @@ -2964,7 +2921,6 @@ def staked(self, update = False, n = None, netuid = 0, - network = 'main', df = True, keys = None, max_age = 1000, @@ -3348,30 +3304,32 @@ def my_value( self, *args, **kwargs ): return sum(list(self.key2value( *args, **kwargs).values())) - def my_total_stake(self, netuid='all', network = 'main', fmt='j', update=False): + def my_total_stake(self, netuid='all', fmt='j', update=False): my_stake_to = self.my_stake_to(netuid=netuid, fmt=fmt, update=update) return sum([sum(list(v.values())) for k,v in my_stake_to.items()]) def check_valis(self, **kwargs): return self.check_servers(search='vali', **kwargs) - def check_servers(self, search='vali',update:bool=False, netuid=0, min_lag=100, timeout=30, remote=False, **kwargs): + def check_servers(self, search='vali',update:bool=False, netuid=0, max_staleness=100, timeout=30, remote=False, **kwargs): if remote: kwargs = c.locals2kwargs(locals()) return self.remote_fn('check_servers', kwargs=kwargs) module_stats = self.stats(search=search, netuid=netuid, df=False, update=update) module2stats = {m['name']:m for m in module_stats} - block = self.block response_batch = {} c.print(f"Checking {len(module2stats)} {search} servers") for module, stats in module2stats.items(): # check if the module is serving - lag = stats['vote_staleness'] - port = int(stats['address'].split(':')[-1]) - if not c.server_exists(module) or lag > min_lag: - c.print(f"Server {module} is not serving or has a lag of {lag} > {min_lag}") + should_serve = not c.server_exists(module) or stats['vote_staleness'] > max_staleness + if should_serve: + + c.print(f"Serving {module}") + port = int(stats['address'].split(':')[-1]) response_batch[module] = c.submit(c.serve, - kwargs=dict(module=module, network=f'subspace.{netuid}', port=port), + kwargs=dict(module=module, + network=f'subspace.{netuid}', + port=port), timeout=timeout) futures = list(response_batch.values()) @@ -3398,7 +3356,6 @@ def compose_call(self, nonce: int = None, remote_module: str = None, unchecked_weight: bool = False, - network = network, mode='ws', trials = 4, max_tip = 10000, @@ -3492,7 +3449,7 @@ def compose_call(self, def tx_history(self, key:str=None, mode='complete', **kwargs): key_ss58 = self.resolve_key_ss58(key) assert mode in ['pending', 'complete'] - pending_path = f'history/{network}/{key_ss58}/{mode}' + pending_path = f'history/{self.network}/{key_ss58}/{mode}' return self.glob(pending_path) def pending_txs(self, key:str=None, **kwargs): @@ -3507,7 +3464,7 @@ def clean_tx_history(self): def resolve_tx_dirpath(self, key:str=None, mode:'str([pending,complete])'='pending', **kwargs): key_ss58 = self.resolve_key_ss58(key) assert mode in ['pending', 'complete'] - pending_path = f'history/{network}/{key_ss58}/{mode}' + pending_path = f'history/{self.network}/{key_ss58}/{mode}' return pending_path def resolve_key(self, key = None): @@ -3528,31 +3485,8 @@ def resolve_key(self, key = None): assert hasattr(key, 'key'), f"Invalid Key {key} as it should have ss58_address attribute." return key - - - def unstake2key(self, key=None): - key2stake = self.key2stake() - c.print(key2stake) - def test_subnet_storage(self): - - all_subnet_params = self.subnet_params(netuid='all') - assert isinstance(all_subnet_params, list) - for subnet_params in all_subnet_params: - assert isinstance(subnet_params, dict) - subnet_params = self.subnet_params(netuid=10) - assert isinstance(subnet_params, dict) - return {'success': True, 'msg': 'All subnet params are dictionaries', 'n': len(all_subnet_params)} - - def test_global_storage(self): - global_params = self.global_params(fmt='j') - assert isinstance(global_params, dict) - return global_params - - def test_module_storage(self): - modules = self.get_modules(netuid=0) - return modules def launcher_keys(self, netuid=0, min_stake=500, **kwargs): keys = c.keys() @@ -3578,6 +3512,10 @@ def load_launcher_keys(self, amount=600, **kwargs): return c.transfer_many(amounts=amounts, destinations=destinations, **kwargs) + @classmethod + def get_feature(cls, feature='names', network='main', netuid=0, update=False, max_age=1000, **kwargs): + return getattr(cls(network=network), feature)(netuid=netuid, update=update, max_age=max_age, **kwargs) + diff --git a/commune/subspace/test.py b/commune/subspace/test.py index 30933bff9..f3c47b98d 100644 --- a/commune/subspace/test.py +++ b/commune/subspace/test.py @@ -40,9 +40,11 @@ def test_substrate(self): c.print(f'{t2-t1:.2f} seconds') - - - + def test_global_storage(self): + global_params = self.global_params(fmt='j') + assert isinstance(global_params, dict) + return global_params + diff --git a/commune/tree/tree.py b/commune/tree/tree.py index a551ff51e..3e947d67b 100644 --- a/commune/tree/tree.py +++ b/commune/tree/tree.py @@ -179,7 +179,8 @@ def resolve_tree(cls, tree:str=None): @classmethod - def path2simple(cls, path:str, ignore_prefixes = ['commune', 'modules', 'commune.modules']) -> str: + def path2simple(cls, path:str, + ignore_prefixes = ['commune', 'modules', 'commune.modules']) -> str: path = os.path.abspath(path) pwd = c.pwd() @@ -199,6 +200,8 @@ def path2simple(cls, path:str, ignore_prefixes = ['commune', 'modules', 'commune chunks = simple_path.split('.') simple_chunks = [] simple_path = '' + + # we want to remove redundant chunks for i, chunk in enumerate(chunks): if len(simple_chunks)>0: if chunk in simple_chunks: @@ -217,6 +220,7 @@ def path2simple(cls, path:str, ignore_prefixes = ['commune', 'modules', 'commune if suffix.endswith('_module'): simple_path = '.'.join(simple_path.split('.')[:-1]) # remove prefixes from commune + for prefix in ignore_prefixes: if simple_path.startswith(prefix): simple_path = simple_path.replace(prefix, '') @@ -230,12 +234,12 @@ def path2simple(cls, path:str, ignore_prefixes = ['commune', 'modules', 'commune return simple_path @classmethod - def path_config_exists(cls, path:str) -> bool: + def path_config_exists(cls, path:str, extension='.py', config_extensions=['.yaml', '.yml']) -> bool: ''' Checks if the path exists ''' - for ext in ['.yaml', '.yml']: - if os.path.exists(path.replace('.py', ext)): + for ext in config_extensions: + if os.path.exists(path.replace(extension, ext)): return True return False diff --git a/commune/vali/vali.py b/commune/vali/vali.py index fb716f50a..f8cb26ccc 100644 --- a/commune/vali/vali.py +++ b/commune/vali/vali.py @@ -6,7 +6,7 @@ class Vali(c.Module): - voting_networks: ['subspace', 'bittensor'] + voting_networks= ['subspace', 'bittensor'] score_fns = ['score_module', 'score'] # the score functions whitelist = ['eval_module', 'score_module', 'eval', 'leaderboard'] @@ -44,6 +44,7 @@ def init_vali(self, config=None, module=None, score_fn=None, **kwargs): def init_state(self): self.futures = [] # futures for the executor # COUNT METRICS + self.last_start_time = 0 # the last time a worker was started self.requests = 0 # the number of requests self.errors = 0 # the number of errors self.successes = 0 # the number of successes @@ -93,6 +94,10 @@ def lifetime(self): @property def is_voting_network(self): return not 'subspace' in self.config.network and 'bittensor' not in self.config.network + + @property + def last_start_staleness(self): + return c.time() - self.last_start_time def run_loop(self): @@ -108,12 +113,14 @@ def run_loop(self): run_info = self.run_info() if self.is_voting_network and self.vote_staleness > self.config.vote_interval: c.print(self.vote()) - if self.success_staleness > self.config.max_success_staleness: + if self.success_staleness > self.config.max_success_staleness and self.last_start_staleness > self.config.max_success_staleness: c.print('Too many stale successes, restarting workers', color='red') self.start_workers() df = self.leaderboard() - c.print(df.sort_values(by=['staleness'], ascending=False)[:42]) + if len(df) > 0: + c.print(df.sort_values(by=['staleness'], ascending=False)[:42]) c.print(run_info) + c.print(df) except Exception as e: c.print(c.detailed_error(e)) @@ -128,6 +135,7 @@ def workers(self): def start_worker(self, id = 0, **kwargs): + self.last_start_time = c.time() worker_name = self.worker_name(id) if self.config.mode == 'thread': worker = c.thread(self.worker, kwargs=kwargs, name=worker_name) @@ -154,24 +162,14 @@ def age(self): def worker(self, - epochs=1e9, - id=0): + epochs=1e9): for epoch in range(int(epochs)): try: - t0 = c.time() - self.epoch() - t1 = c.time() - latency = t1 - t0 + result = self.epoch() except Exception as e: - c.print('Dawg, theres an error in the epoch') - c.print(c.detailed_error(e)) - - @classmethod - def run_epoch(cls, network='local', vali=None, **kwargs): - if vali != None: - cls = c.module('vali.'+vali) - self = cls(network=network, **kwargs) - return self.epoch() + result = c.detailed_error(e) + c.print(f'Leaderboard epoch={self.epochs})' , color='yellow') + c.print(self.leaderboard()) @@ -193,6 +191,14 @@ def cancel_futures(self): epoch2results = {} + @classmethod + def run_epoch(cls, network='local', vali=None, **kwargs): + if vali != None: + cls = c.module('vali.'+vali) + self = cls(network=network, **kwargs) + return self.epoch() + + def epoch(self, **kwargs): try: @@ -200,7 +206,6 @@ def epoch(self, **kwargs): self.sync_network(**kwargs) module_addresses = c.shuffle(list(self.namespace.values())) c.print(f'Epoch {self.epochs} with {len(module_addresses)} modules', color='yellow') - batch_size = min(self.config.batch_size, len(module_addresses)//4) self.executor = c.module('executor.thread')(max_workers=self.config.threads_per_worker, maxsize=self.config.maxsize) @@ -223,7 +228,7 @@ def epoch(self, **kwargs): self.cancel_futures() except Exception as e: - c.print(c.detailed_error(e)) + c.print(c.detailed_error(e), color='red') return results @@ -346,16 +351,18 @@ def get_module(self, assert module in self.address2name, f"{module} is not found in {self.config.network}" name = self.address2name[module] address = module + + assert name != self.server_name, f'Cannot call the server name {self.server_name}' path = self.get_module_path(module) module = c.connect(address, key=self.key) info = self.get(path, {}) if not self.is_info(info): info = module.info(timeout=self.config.timeout_info) - info['past_timestamp'] = info.get('timestamp', 0) # for the stalnesss + info['staleness'] = c.time()- info.get('timestamp', 0) info['timestamp'] = c.timestamp() # the timestamp - info['staleness'] = info['timestamp'] - info['past_timestamp'] info['w'] = info.get('w', 0) # the weight from the module + info['count'] = info.get('count', 0) # the number of times the module has been called info['past_w'] = info['w'] # for the alpha info['path'] = path # path of saving the module info['name'] = name # name of the module cleint @@ -377,7 +384,7 @@ def eval(self, module:str, network:str=None, update=False, - verbose_keys= ['w', 'address', 'name', 'key'], + verbose_keys= ['w', 'address', 'name', 'key', 'count'], **kwargs): """ The following evaluates a module sver @@ -388,6 +395,7 @@ def eval(self, info = module.local_info self.last_sent = c.time() self.requests += 1 + c.print(info, 'INFO') response = self.score_module(module, **kwargs) response = self.process_response(response=response, info=info) response = {k:response[k] for k in verbose_keys} @@ -437,7 +445,7 @@ def process_response(self, response:dict, info:dict ): if info['w'] > self.config.min_leaderboard_weight: self.put(info['path'], info) - c.print(f'Result(w={info["w"]}, name={info["name"]} latency={c.round(info["latency"], 3)} staleness={info["staleness"]} )' , color='green') + c.print(f'Result(w={info["w"]}, name={info["name"]} latency={c.round(info["latency"], 3)} count={info["count"]} )' , color='green') self.successes += 1 self.last_success = c.time() @@ -465,7 +473,10 @@ def storage_path(self, network:Optional[str]=None): def vote_info(self): try: if not self.is_voting_network(): - return {'success': False, 'msg': 'Not a voting network', 'network': self.config.network} + return {'success': False, + 'msg': 'Not a voting network' , + 'network': self.config.network , + 'voting_networks': self.voting_networks ,} votes = self.calculate_votes() except Exception as e: votes = {'uids': [], 'weights': []} @@ -524,9 +535,7 @@ def module_info(self, **kwargs): return {} def leaderboard(self, - keys = ['name', 'w', - 'staleness', - 'latency', 'address'], + keys = ['name', 'w', 'staleness', 'latency', 'count', 'address'], max_age = None, network = None, ascending = True, diff --git a/commune/vali/vali.yaml b/commune/vali/vali.yaml index 35a46583a..47788d283 100644 --- a/commune/vali/vali.yaml +++ b/commune/vali/vali.yaml @@ -4,6 +4,7 @@ fn: null initial_sleep: 5 max_leaderboard_age: 3600 max_staleness: 60 +run_step_interval: 1 max_success_staleness: 100 maxsize: 128 min_leaderboard_weight: 0 diff --git a/docs/module/1_module.md b/docs/basics/1_module.md similarity index 100% rename from docs/module/1_module.md rename to docs/basics/1_module.md diff --git a/docs/module/2_cli.md b/docs/basics/2_cli.md similarity index 100% rename from docs/module/2_cli.md rename to docs/basics/2_cli.md diff --git a/docs/module/3_server.md b/docs/basics/3_server.md similarity index 100% rename from docs/module/3_server.md rename to docs/basics/3_server.md diff --git a/docs/module/4_key.md b/docs/basics/4_key.md similarity index 100% rename from docs/module/4_key.md rename to docs/basics/4_key.md diff --git a/docs/module/5_app.md b/docs/basics/5_app.md similarity index 100% rename from docs/module/5_app.md rename to docs/basics/5_app.md diff --git a/docs/module/6_tree.md b/docs/basics/6_tree.md similarity index 100% rename from docs/module/6_tree.md rename to docs/basics/6_tree.md