Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Your Name committed May 24, 2024
1 parent e898bba commit 5174735
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 84 deletions.
19 changes: 11 additions & 8 deletions commune/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ def __init__(
**kwargs
):
self.loop = c.get_event_loop() if loop == None else loop

self.set_client(address = address, network=network)
self.serializer = c.module(serializer)()
self.key = c.get_key(key)
self.start_timestamp = c.timestamp()
self.save_history = save_history
self.history_path = history_path
self.debug = debug
self.default_fn = default_fn
self.set_client(address = address, network=network)


def prepare_request(self, args: list = None, kwargs: dict = None, params=None, message_type = "v0"):

Expand Down Expand Up @@ -132,8 +132,7 @@ def process_stream_line(line ):
event_data = json.loads(event_data)['data']
return event_data

if stream:

if stream:
async def stream_generator(response):
try:
async for line in response.content:
Expand All @@ -143,7 +142,6 @@ async def stream_generator(response):
yield event
except Exception as e:
await self.session.close()
await response.close()
return stream_generator(response)
else:
result = []
Expand Down Expand Up @@ -194,7 +192,7 @@ def prepare_url(self, address, fn):


def forward(self, *args, **kwargs):
return self.loop.run_until_complete(self.aysnc_forward(*args, **kwargs))
return self.loop.run_until_complete(self.async_forward(*args, **kwargs))

async def async_forward(self,
fn: str,
Expand All @@ -218,7 +216,7 @@ async def async_forward(self,
kwargs.update(extra_kwargs)
timestamp = c.time()
request = self.prepare_request(args=args, kwargs=kwargs, params=params, message_type=message_type)
future = await self.send_request(url=url, request=request, headers=headers, verbose=verbose, stream=stream)
result = await self.send_request(url=url, request=request, headers=headers, verbose=verbose, stream=stream)

if type(result) in [str, dict, int, float, list, tuple]:
result = self.serializer.deserialize(result)
Expand All @@ -238,7 +236,8 @@ async def async_forward(self,


def __del__(self):
self.loop.run_until_complete(self.session.close())
if hasattr(self , 'session'):
self.loop.run_until_complete(self.session.close())


def age(self):
Expand Down Expand Up @@ -396,3 +395,7 @@ def test(self, module='module::test_client'):
key = c.get_key(module)
assert info['ss58_address'] == key.ss58_address
return {'info': info, 'key': str(key)}

def __del__(self):
if hasattr(self, 'session'):
asyncio.run(self.session.close())
9 changes: 8 additions & 1 deletion commune/executor/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ def __init__(
def is_empty(self):
return self.work_queue.empty()

@property
def is_full(self):
return self.work_queue.full()




def submit(self,
fn: Callable,
Expand Down Expand Up @@ -235,5 +241,6 @@ def status(self):
return dict(
num_threads = len(self.threads),
num_tasks = self.num_tasks,
is_empty = self.is_empty
is_empty = self.is_empty,
is_full = self.is_full
)
1 change: 0 additions & 1 deletion commune/server/access.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def run_loop(self):
r = self.sync_network()
except Exception as e:
r = c.detailed_error(e)
c.print(r)
c.sleep(self.config.sync_interval)

def sync_network(self):
Expand Down
3 changes: 2 additions & 1 deletion commune/vali/text/truthqa/vali_text_truthqa.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
network: subspace # global or local network
netuid: 0
alpha: 0.9
timeout: 2
timeout: 20
min_update_interval: 10
tag: null
target: 'answers'
search: model
Expand Down
115 changes: 48 additions & 67 deletions commune/vali/vali.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,17 @@ def epoch_info(self):

}

def start_workers(self):
for i in range(self.config.workers):
self.start_worker(i)


def run_loop(self):
c.sleep(self.config.initial_sleep)
# start the workers
self.start_time = c.time()
for i in range(self.config.workers):
self.start_worker(i)
self.start_workers()

while True:
c.sleep(self.config.print_interval)
try:
Expand All @@ -77,6 +81,8 @@ def run_loop(self):
else:
if self.vote_staleness > self.config.vote_interval:
c.print(self.vote())
if run_info['epoch']['sent_staleness'] > self.config.max_sent_staleness:
self.start_workers()
c.print(run_info)

except Exception as e:
Expand Down Expand Up @@ -116,25 +122,7 @@ def age(self):
return c.time() - self.start_time


def process_result(self, result):
w = result.get('w', 0)
address = result.get('address', 'unknown')
name = result.get('name', 'unknown')
if c.is_error(result):
self.errors += 1
else:
# record the success statistics

if result.get('w', 0) > 0:
self.successes += 1
self.last_success = c.time()
else:
self.errors += 1
c.print(f'Rewarding >>>> {result}', color='green', verbose=self.config.verbose)

return result



def worker(self,
epochs=1e9,
id=0):
Expand All @@ -155,17 +143,16 @@ def run_epoch(cls, network='local', vali=None, **kwargs):
cls = c.module('vali.'+vali)
self = cls(network=network, **kwargs)
return self.epoch()


def epoch(self, **kwargs):

def epoch(self,
**kwargs):
module_addresses = c.shuffle(list(self.namespace.values()))
c.print(f'Epoch {self.epochs} with {len(module_addresses)} modules', color='yellow')
batch_size = min(self.config.batch_size, len(module_addresses))
self.executor = c.module('executor.thread')(max_workers=self.config.threads_per_worker, maxsize=self.config.threads_per_worker)
batch_size = self.config.batch_size
self.executor = c.module('executor.thread')(max_workers=self.config.threads_per_worker,
maxsize=self.config.threads_per_worker*2)
progress_bar = c.tqdm(len(module_addresses))

self.sync(network=self.config.network)

futures = []
Expand All @@ -181,21 +168,19 @@ def epoch(self,
for future in c.as_completed(futures, timeout=self.config.timeout):
result = future.result()
futures.remove(future)
result = self.process_result(result)
results.append(result)
break
except Exception as e:
pass


for future in c.as_completed(futures, timeout=self.config.timeout):
try:
result = future.result()
except Exception as e:
result = c.detailed_error(e)
futures.remove(future)
result = self.process_result(result)
results.append(result)

return results

def network_staleness(self):
Expand Down Expand Up @@ -292,6 +277,19 @@ def next_module(self):

module2last_update = {}

def process_score(self, response):
# PROCESS THE RESPONSE
if type(response) in [int, float, bool]:
# if the response is a number, we want to convert it to a dict
response = {'w': float(response)}
elif type(response) == dict:
response = response
else:
raise Exception(f'Response must be a number or a boolean, got {response}')
assert type(response['w']) in [int, float], f'Response weight must be a number, got {response["w"]}'

return response

def eval(self, module:str,
network:str=None,
verbose_keys:List[str] = ['w', 'latency', 'name', 'address', 'ss58_address', 'path', 'staleness'],
Expand All @@ -303,9 +301,6 @@ def eval(self, module:str,
network = network or self.config.network
self.sync(network=network)

# load the module info and calculate the staleness of the module
# if the module is stale, we can just return the module info

info = {}
# RESOLVE THE NAME OF THE ADDRESS IF IT IS NOT A NAME
if module in self.name2address:
Expand All @@ -316,50 +311,36 @@ def eval(self, module:str,
info['name'] = self.address2name[module]
info['address'] = module

self.requests += 1
name = info['name']
lag = c.time() - self.module2last_update.get(name, 0) # calculate the lag

if lag < self.config.min_update_interval:
seconds_left = self.config.min_update_interval - lag
return {'success': False, 'msg': f'{lag} lag is too small please wait for {seconds_left} seconds before calling again'}
self.module2last_update[name] = self.last_sent
self.last_sent = c.time()

# CONNECT TO THE MODULE
module = c.connect(info['address'], key=self.key)
path = self.resolve_path(self.storage_path() + f"/{info['name']}")
cached_info = self.get(path, {})

if len(cached_info) > 0 :
info = cached_info
else:
info = self.get(path, {})
is_info = bool(isinstance(info, dict) and 'ss58_address' in info)
if not is_info:
info = module.info(timeout=4)
assert 'address' in info and 'name' in info, f'Info must have a address key, got {info}'
info['staleness'] = c.time() - info.get('timestamp', 0)
info['path'] = path

self.requests += 1
name = info['name']
address = info['address']
key_address = info['ss58_address']
c.print(f'Evaluating {name} (address={address},key={key_address})', color='purple')
start_time = c.time()

response = self.score_module(module)
if type(response) in [int, float, bool]:
# if the response is a number, we want to convert it to a dict
response = {'w': float(response)}
elif type(response) == dict:
response = response
else:
raise Exception(f'Response must be a number or a boolean, got {response}')
assert type(response['w']) in [int, float], f'Response weight must be a number, got {response["w"]}'

response = self.process_score(resuresponselt)
response['timestamp'] = start_time
response['latency'] = c.time() - response.get('timestamp', 0)
response['w'] = response['w'] * self.config.alpha + info.get('w', response['w']) * (1 - self.config.alpha)
response['w'] = c.round(response['w'], 3)
# merge the info with the response
info.update(response)
self.put(path, info)
if response['w'] > 0:
self.successes += 1
self.last_success = c.time()
info.update(response)
self.put(path, info)
else:
self.errors += 1
response = {k:info[k] for k in verbose_keys}
return response
c.print(f'Rewarding >>>> {result}', color='green', verbose=self.config.verbose)
return result

eval_module = eval

Expand Down Expand Up @@ -451,11 +432,11 @@ def leaderboard(self,
'staleness',
'latency'],
path = 'cache/module_infos',
max_age = None,
max_age = 3600,
min_weight = 0,
network = None,
ascending = False,
sort_by = ['w'],
ascending = True,
sort_by = ['staleness'],
to_dict = False,
n = 50,
page = None,
Expand Down
13 changes: 7 additions & 6 deletions commune/vali/vali.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,21 @@ fn : null # the function to filter

# epoch
mode: thread
batch_size: 32 # the batch size for the worker
batch_size: 64 # the batch size for the worker
workers: 1 # the number of workers
threads_per_worker: 32
timeout: 3
sleep_time: 0.05
threads_per_worker: 64
timeout: 10
sleep_time: 0.0005
refresh : True
start: True
is_main_worker: True
debug: False
min_update_interval: 30 # the minimum interval to update the network
alpha: 0.5 # the ma average
min_update_interval: 10 # the minimum interval to update the network
alpha: 1.0 # the ma average
max_leaderboard_age: 3600 # the maximum age of the leaderboard
max_history: 10
disable_voting: 1
max_sent_staleness: 10

# run loop
initial_sleep : 5 # the initial sleep time before running the loop
Expand Down

0 comments on commit 5174735

Please sign in to comment.