Skip to content

Commit

Permalink
vali merge and ticket update
Browse files Browse the repository at this point in the history
  • Loading branch information
latentvector committed May 29, 2024
2 parents 6fd5590 + e477ca9 commit 3a36647
Show file tree
Hide file tree
Showing 17 changed files with 625 additions and 355 deletions.
18 changes: 7 additions & 11 deletions commune/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ def get_next():
obj = self.loop.run_until_complete(ait.__anext__())
return obj
except StopAsyncIteration:
self.loop.run_until_complete(self.session.close())
return 'done'
# actual sync iterator (implemented using a generator)
while True:
Expand All @@ -112,7 +111,8 @@ async def send_request(self, url:str,
# start a client session and send the request
url = 'http://' + url if not url.startswith('http') else url
c.print(f"🛰️ Call {url} 🛰️ (🔑{self.key.ss58_address})", color='green', verbose=verbose)
self.session = aiohttp.ClientSession()
if not hasattr(self, 'session'):
self.session = aiohttp.ClientSession()
response = await self.session.post(url, json=request, headers=headers)
if response.content_type == 'application/json':
result = await asyncio.wait_for(response.json(), timeout=timeout)
Expand All @@ -134,14 +134,11 @@ def process_stream_line(line ):

if stream:
async def stream_generator(response):
try:
async for line in response.content:
event = process_stream_line(line)
if event == '':
continue
yield event
except Exception as e:
await self.session.close()
async for line in response.content:
event = process_stream_line(line)
if event == '':
continue
yield event
return stream_generator(response)
else:
result = []
Expand All @@ -159,7 +156,6 @@ async def stream_generator(response):
else:
raise ValueError(f"Invalid response content type: {response.content_type}")

await self.session.close()

return result

Expand Down
8 changes: 7 additions & 1 deletion commune/executor/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class ThreadPoolExecutor(c.Module):
def __init__(
self,
max_workers: int =None,
maxsize : int =200 ,
maxsize : int = None ,
thread_name_prefix : str ="",
):
"""Initializes a new ThreadPoolExecutor instance.
Expand All @@ -39,8 +39,10 @@ def __init__(
execute the given calls.
thread_name_prefix: An optional name prefix to give our threads.
"""
self.start_time = c.time()

max_workers = (os.cpu_count() or 1) * 5 if max_workers == None else max_workers
maxsize = max_workers or None
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")

Expand All @@ -62,6 +64,10 @@ def is_full(self):
return self.work_queue.full()


def default_priority_score(self):
# older scores are higher priority
return 1 # abs((self.start_time - c.time()))



def submit(self,
Expand Down
62 changes: 62 additions & 0 deletions commune/function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@

import commune as c

class Function(c.Module):
@classmethod
def fn_schema(cls, fn:str,
defaults:bool=True,
code:bool = False,
docs:bool = True,
version=2)->dict:
'''
Get function schema of function in cls
'''
fn_schema = {}
fn = cls.get_fn(fn)
fn_schema['input'] = cls.get_function_annotations(fn=fn)

for k,v in fn_schema['input'].items():
v = str(v)
if v.startswith('<class'):
fn_schema['input'][k] = v.split("'")[1]
elif v.startswith('typing.'):
fn_schema['input'][k] = v.split(".")[1].lower()
else:
fn_schema['input'][k] = v

fn_schema['output'] = fn_schema['input'].pop('return', {})

if docs:
fn_schema['docs'] = fn.__doc__
if code:
fn_schema['code'] = cls.fn_code(fn)

fn_args = c.get_function_args(fn)
fn_schema['type'] = 'static'
for arg in fn_args:
if arg not in fn_schema['input']:
fn_schema['input'][arg] = 'NA'
if arg in ['self', 'cls']:
fn_schema['type'] = arg
fn_schema['input'].pop(arg)
if 'default' in fn_schema:
fn_schema['default'].pop(arg, None)


if defaults:
fn_schema['default'] = cls.fn_defaults(fn=fn)
for k,v in fn_schema['default'].items():
if k not in fn_schema['input'] and v != None:
fn_schema['input'][k] = type(v).__name__ if v != None else None

if version == 1:
pass
elif version == 2:
defaults = fn_schema.pop('default', {})
fn_schema['input'] = {k: {'type':v, 'default':defaults.get(k)} for k,v in fn_schema['input'].items()}
else:
raise Exception(f'Version {version} not implemented')


return fn_schema

14 changes: 11 additions & 3 deletions commune/module/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -2467,6 +2467,8 @@ def serve(cls,
module = cls.module_path()
kwargs = kwargs or {}
kwargs.update(extra_kwargs or {})
if kwargs.get('debug', False):
remote = False
name = server_name or name # name of the server if None, it will be the module name
name = cls.resolve_server_name(module=module, name=name, tag=tag, tag_seperator=tag_seperator)
if tag_seperator in name:
Expand Down Expand Up @@ -2718,8 +2720,6 @@ def fn_schema(cls, fn:str,
fn = cls.get_fn(fn)
fn_schema['input'] = cls.get_function_annotations(fn=fn)



for k,v in fn_schema['input'].items():
v = str(v)
if v.startswith('<class'):
Expand Down Expand Up @@ -6059,6 +6059,10 @@ def total_supply(self, *args, **kwargs):
@classmethod
def update_module(cls, *args, **kwargs):
return c.module('subspace')().update_module(*args, **kwargs)

@classmethod
def update_modules(cls, *args, **kwargs):
return c.module('subspace')().update_modules(*args, **kwargs)

@classmethod
def set_weights(cls, *args, **kwargs):
Expand Down Expand Up @@ -6624,6 +6628,7 @@ def thread(cls,fn: Union['callable', str],
assert isinstance(args, list), f'args must be a list, got {args}'
assert isinstance(kwargs, dict), f'kwargs must be a dict, got {kwargs}'

# unique thread name
if name == None:
name = fn.__name__
cnt = 0
Expand All @@ -6633,8 +6638,10 @@ def thread(cls,fn: Union['callable', str],
tag = ''
name = name + tag_seperator + tag + str(cnt)

t = threading.Thread(target=fn, args=args, kwargs=kwargs, **extra_kwargs)
if name in cls.thread_map:
cls.thread_map[name].cancel()

t = threading.Thread(target=fn, args=args, kwargs=kwargs, **extra_kwargs)
# set the time it starts
setattr(t, 'start_time', c.time())
t.daemon = daemon
Expand Down Expand Up @@ -7001,6 +7008,7 @@ def ticket(self, *args, **kwargs):
return c.module('ticket')().create(*args, **kwargs)

def save_ticket(self, key=None, **kwargs):

key = c.get_key(key)
return key.save_ticket(**kwargs)

Expand Down
6 changes: 5 additions & 1 deletion commune/namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,11 @@ def merge_namespace(cls, from_network:str, to_network:str, module = None):
return {'success': True, 'msg': f'Namespace {from_network} merged into {to_network}.'}

@classmethod
def add_server(cls, address:str, name=None, network:str = 'local',timeout:int=10, **kwargs):
def add_server(cls, address:str, name=None, network:str = 'local',timeout:int=4, **kwargs):

"""
Add a server to the namespace.
"""
module = c.connect(address)
info = module.info(timeout=timeout)
name = info['name'] if name == None else name
Expand Down
15 changes: 11 additions & 4 deletions commune/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,20 @@ def __init__(

def forward(self, fn:str, input:dict):
"""
OPTION 1:
fn (str): the function to call
input (dict): the input to the function
data: the data to pass to the function
kwargs/params (Optional): the keyword arguments to pass to the function
args (optional): the positional arguments to pass to the function
timestamp: the timestamp of the request
address: the address of the caller (ss58_address)
signature: the signature of the request
signature: the signature of the request
OPTION 2
input (dict): the input to the function
**kwargs, # the params
access_token: {timestamp}::{address}::{signature}
"""
user_info = None
Expand All @@ -64,14 +70,15 @@ def forward(self, fn:str, input:dict):
{timestamp}::signature::{signature}::address::{address}
"""
assert self.key.verify(input['access_token']), f"Data not signed with correct key"
timestamp = int(input['module_ticket'].split('::signature::')[0])
timestamp = int(input['access_token'].split('::signature::')[0])
if all([k not in input for k in ['kwargs', 'params', 'args']]):
"""
We assume the data is in the input, and the token
"""
kwargs = input
kwargspop('access_token')
input['kwargs'] = input


if 'params' in input:
# if the params are in the input, we want to move them to the data
if isinstance(input['params'], dict):
Expand Down Expand Up @@ -168,7 +175,7 @@ def set_module(self, module,
module.network = self.network
module.subnet = self.subnet
self.schema = module.schema()
self.key = self.module.key = c.get_key(key or self.name)
self.key = self.module.key = c.get_key(key or self.name, create_if_not_exists=True)
self.access_module = c.module(access_module)(module=self.module)
self.set_api()
return {'success': True, 'msg': f'Set module {module}', 'key': self.key.ss58_address}
Expand Down
14 changes: 8 additions & 6 deletions commune/server/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,17 @@ def test_serving(cls):


@classmethod
def test_serving_with_different_key(cls, module_name = 'storage::test', key='test'):
module = c.serve(module_name, key=key)
while not c.server_exists(module_name):
c.sleep(1)
c.print('waiting for server to start')
def test_serving_with_different_key(cls, module_name = 'storage::test', key_name='storage::test2'):
c.add_key(key_name)
module = c.serve(module_name, key=key_name)
key = c.get_key(key_name)
c.sleep(1)
module = c.connect(module_name)
info = module.info()
key = c.get_key(key)

assert info['ss58_address'] == key.ss58_address, f"key failed {key.ss58_address} != {info['ss58_address']}"
c.kill(module_name)

c.rm_key(key_name)
return {'success': True, 'msg': 'server test passed'}

Loading

0 comments on commit 3a36647

Please sign in to comment.