Skip to content

Commit

Permalink
modules refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
latentvector committed May 21, 2024
1 parent 6c7ebc6 commit 38b3427
Show file tree
Hide file tree
Showing 23 changed files with 664 additions and 1,551 deletions.
186 changes: 100 additions & 86 deletions commune/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
import aiohttp
import json

STREAM_PREFIX = 'data: '
BYTES_PER_MB = 1e6


class Client(c.Module):
count = 0
Expand Down Expand Up @@ -86,68 +85,85 @@ def prepare_request(self, args: list = None, kwargs: dict = None, params=None, m

return request

def iter_over_async(self, ait):
loop = asyncio.get_event_loop()
# helper async fn that just gets the next element
# from the async iterator
def get_next():
try:
obj = loop.run_until_complete(ait.__anext__())
return obj
except StopAsyncIteration:
loop.run_until_complete(self.session.close())
return 'done'
# actual sync iterator (implemented using a generator)
while True:
obj = get_next()
if obj == 'done':
break
yield obj


async def send_request(self, url:str, request: dict, headers=None, timeout:int=10, verbose=False):
async def send_request(self, url:str,
request: dict,
headers=None,
timeout:int=10,
stream = False,
verbose=False):
# start a client session and send the request

if not url.startswith('http'):
url = 'http://' + url

url = 'http://' + url if not url.startswith('http') else url
c.print(f"🛰️ Call {url} 🛰️ (🔑{self.key.ss58_address})", color='green', verbose=verbose)

async with aiohttp.ClientSession() as session:
async with session.post(url, json=request, headers=headers) as response:

if response.content_type == 'application/json':
result = await asyncio.wait_for(response.json(), timeout=timeout)

elif response.content_type == 'text/plain':
result = await asyncio.wait_for(response.text(), timeout=timeout)

elif response.content_type == 'text/event-stream':
if self.debug:
progress_bar = c.tqdm(desc='MB per Second', position=0)
result = {}
async for line in response.content:

event_data = line.decode('utf-8')
event_bytes = len(event_data)

if self.debug :
progress_bar.update(event_bytes/(BYTES_PER_MB))

# remove the "data: " prefix
if event_data.startswith(STREAM_PREFIX):
event_data = event_data[len(STREAM_PREFIX):]

event_data = event_data.strip()

# skip empty lines
if event_data == "":
continue

# if the data is formatted as a json string, load it {data: ...}
if isinstance(event_data, bytes):
event_data = event_data.decode('utf-8')

# if the data is formatted as a json string, load it {data: ...}
if isinstance(event_data, str):
if event_data.startswith('{') and event_data.endswith('}') and 'data' in event_data:
event_data = json.loads(event_data)['data']
result += [event_data]

# process the result if its a json string
if result.startswith('{') and result.endswith('}') or \
result.startswith('[') and result.endswith(']'):
self.session = aiohttp.ClientSession()
response = await self.session.post(url, json=request, headers=headers)
if response.content_type == 'application/json':
result = await asyncio.wait_for(response.json(), timeout=timeout)
elif response.content_type == 'text/plain':
result = await asyncio.wait_for(response.text(), timeout=timeout)
elif response.content_type == 'text/event-stream':
STREAM_PREFIX = 'data: '

def process_stream_line(line ):
event_data = line.decode('utf-8')
event_data = event_data[len(STREAM_PREFIX):] if event_data.startswith(STREAM_PREFIX) else event_data
event_data = event_data.strip() # remove leading and trailing whitespaces
if event_data == "": # skip empty lines if the event data is empty
return ''
if isinstance(event_data, str):
if event_data.startswith('{') and event_data.endswith('}') and 'data' in event_data:
event_data = json.loads(event_data)['data']
return event_data

if stream:

async def stream_generator(response):
try:
async for line in response.content:
event = process_stream_line(line)
if event == '':
continue
yield event
except Exception as e:
await self.session.close()
await response.close()
return stream_generator(response)
else:
result = []
async for line in response.content:
event = process_stream_line(line)
if event == '':
continue
result += [event]
# process the result if its a json string
if isinstance(result, str):
if (result.startswith('{') and result.endswith('}')) or result.startswith('[') and result.endswith(']'):
result = ''.join(result)
result = json.loads(result)
else:
raise ValueError(f"Invalid response content type: {response.content_type}")
if type(result) in [str, dict]:
result = self.serializer.deserialize(result)
if isinstance(result, dict) and 'data' in result:
result = result['data']

else:
raise ValueError(f"Invalid response content type: {response.content_type}")

await self.session.close()

return result


Expand Down Expand Up @@ -176,8 +192,12 @@ def prepare_url(self, address, fn):
address = address.split('://')[-1]
url = f"{address}/{fn}/"
return url


async def async_forward(self, *args, **kwargs):
return self.forward(*args, **kwargs)

async def async_forward(self,
def forward(self,
fn: str,
args: list = None,
kwargs: dict = None,
Expand All @@ -188,27 +208,30 @@ async def async_forward(self,
message_type = "v0",
key : str = None,
verbose = False,
stream = False,
**extra_kwargs
):
key = self.resolve_key(key)
url = self.prepare_url(address, fn)
# resolve the kwargs at least
kwargs =kwargs or {}
kwargs.update(extra_kwargs)
timestamp = c.time()
request = self.prepare_request(args=args, kwargs=kwargs, params=params, message_type=message_type)
result = await self.send_request(url=url, request=request, headers=headers, timeout=timeout, verbose=verbose)
result = asyncio.run(self.send_request(url=url, request=request, headers=headers, timeout=timeout, verbose=verbose, stream=stream))

if self.save_history:
input = self.serializer.deserialize(request)
path = self.history_path+ '/' + self.key.ss58_address + '/' + self.address+ '/'+ str(input['timestamp'])
output = {
'address': address,
'fn': fn,
'input': input,
'result': result,
'latency': c.time() - input['timestamp'],
}
self.put(path, output)
if type(result) in [str, dict, int, float, list, tuple]:
result = self.serializer.deserialize(result)
if isinstance(result, dict) and 'data' in result:
result = result['data']
latency = c.time() - timestamp
if self.save_history:
output = { 'input': request, 'output': result, 'latency': latency}
path = self.history_path+ '/' + self.key.ss58_address + '/' + self.address+ '/'+ str(timestamp)
self.put(path, output)
else:
result = self.iter_over_async(result)

return result


Expand Down Expand Up @@ -243,26 +266,18 @@ def history(cls, key=None, history_path='history'):
key = c.get_key(key)
return cls.ls(history_path + '/' + key.ss58_address)


def forward(self,*args,return_future:bool=False, timeout:str=4, **kwargs):
forward_future = asyncio.wait_for(self.async_forward(*args, **kwargs), timeout=timeout)
if return_future:
return forward_future
else:
return self.loop.run_until_complete(forward_future)



@classmethod
def call(cls, module : str,
fn:str = None,
*args,
timeout : int = 10,
kwargs = None,
params = None,
prefix_match:bool = False,
network:str = 'local',
key:str = None,
kwargs = None,
params = None,
stream = False,
**extra_kwargs) -> None:

if '//' in module:
Expand All @@ -280,13 +295,12 @@ def call(cls, module : str,
key=key)
# if isinstance(kwargs, str):
# kwargs = c.str2dict(kwargs)
if params != None:
kwargs = params
if kwargs == None:
kwargs = {}
kwargs.update(extra_kwargs)
return asyncio.run(module.async_forward(fn=fn,
args=args,
kwargs=kwargs,
params=params))
return module.forward(fn=fn, args=args, kwargs=kwargs, stream=stream)


@classmethod
Expand Down
17 changes: 16 additions & 1 deletion commune/code/code.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,4 +185,19 @@ def file2file(self, path, **kwargs):
content = self.model.forward(content, **kwargs)
c.put_text(path, content)
return content



@staticmethod
def get_files_code(directory):
code_dict = {}

for root, dirs, files in os.walk(directory):
for file in files:
file_path = os.path.join(root, file)
relative_path = os.path.relpath(file_path, directory)

with open(file_path, 'r') as f:
code = f.read()
code_dict[relative_path] = code

return code_dict
Loading

0 comments on commit 38b3427

Please sign in to comment.