Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions connectonion/core/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import os
import sys
import time
import base64
from typing import List, Optional, Dict, Any, Callable, Union
from pathlib import Path
from .llm import LLM, create_llm, TokenUsage
Expand Down Expand Up @@ -218,14 +219,18 @@ def _register_event(self, event_func: EventHandler):
self.events[event_type].append(event_func)

def input(self, prompt: str, max_iterations: Optional[int] = None,
session: Optional[Dict] = None, images: list[str] | None = None) -> str:
session: Optional[Dict] = None, images: list[str] | None = None,
files: list[dict] | None = None) -> str:
"""Provide input to the agent and get response.

Args:
prompt: The input prompt or data to process
max_iterations: Override agent's max_iterations for this request
session: Optional session to continue a conversation.
images: Optional list of base64 data URLs for multimodal input
files: Optional list of file dicts with keys:
- name: filename (e.g. "report.pdf")
- data: base64-encoded data URL (e.g. "data:application/pdf;base64,...")

Returns:
The agent's response after processing the input
Expand Down Expand Up @@ -254,11 +259,34 @@ def input(self, prompt: str, max_iterations: Optional[int] = None,
# Start YAML session logging
self.logger.start_session(self.system_prompt)

# Add user message to conversation (multimodal if images provided)
if images:
# Save uploaded files to .co/uploads/ and build file path references
saved_files = []
if files:
uploads_dir = self.logger.co_dir / "uploads"
uploads_dir.mkdir(parents=True, exist_ok=True)
for f in files:
safe_name = Path(f["name"]).name
file_path = uploads_dir / safe_name
# Decode base64 data URL and write to disk
data_url = f["data"]
if "," in data_url:
raw_data = base64.b64decode(data_url.split(",", 1)[1])
else:
raw_data = base64.b64decode(data_url)
file_path.write_bytes(raw_data)
saved_files.append(str(file_path))

# Add user message to conversation (multimodal if images or files provided)
if images or saved_files:
content = [{"type": "text", "text": prompt}]
for img in images:
for img in (images or []):
content.append({"type": "image_url", "image_url": {"url": img}})
if saved_files:
file_list = "\n".join(f"- {p}" for p in saved_files)
content.append({
"type": "text",
"text": f"<system-reminder>The user uploaded the following files:\n{file_list}\nUse your read_file tool or other available tools to read the file contents before responding. Do not assume or guess the contents.</system-reminder>"
})
self.current_session['messages'].append({"role": "user", "content": content})
else:
self.current_session['messages'].append({"role": "user", "content": prompt})
Expand Down
8 changes: 7 additions & 1 deletion connectonion/network/asgi/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,13 @@ async def handle_http(

# Extract session for conversation continuation
session = data.get("session")
result = route_handlers["input"](storage, prompt, session)
images = data.get("images")
files = data.get("files")
try:
result = route_handlers["input"](storage, prompt, session, images=images, files=files)
except ValueError as e:
await send_json(send, {"error": str(e)}, 400)
return
await send_json(send, result)

elif method == "GET" and path.startswith("/sessions/"):
Expand Down
3 changes: 2 additions & 1 deletion connectonion/network/asgi/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ async def handle_websocket(
# Extract session for conversation continuation (same as HTTP)
session = data.get("session")
images = data.get("images")
files = data.get("files")

# Create IO for bidirectional communication
io = WebSocketIO()
Expand All @@ -144,7 +145,7 @@ async def handle_websocket(

def run_agent():
try:
result_holder[0] = route_handlers["ws_input"](storage, prompt, io, session, images)
result_holder[0] = route_handlers["ws_input"](storage, prompt, io, session, images, files)
except Exception as e:
error_holder[0] = str(e)
agent_done.set()
Expand Down
35 changes: 28 additions & 7 deletions connectonion/network/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ def input(
self,
prompt: str,
timeout: float = 60.0,
on_onboard: Optional[Callable[[List[str], Optional[float]], Dict[str, Any]]] = None
on_onboard: Optional[Callable[[List[str], Optional[float]], Dict[str, Any]]] = None,
images: Optional[List[str]] = None,
files: Optional[List[Dict[str, Any]]] = None,
) -> Response:
"""
Send prompt to remote agent and get response.
Expand All @@ -205,6 +207,8 @@ def input(
Called with (methods: list[str], payment_amount: float | None).
Should return {"invite_code": "..."} or {"payment": amount}.
If None, prompts interactively in terminal.
images: Optional list of base64 data URLs for multimodal input
files: Optional list of file dicts with name and base64 data

Returns:
Response with text and done flag
Expand All @@ -229,16 +233,18 @@ def input(
except RuntimeError as e:
if "input() cannot be used" in str(e):
raise
return asyncio.run(self._stream_input(prompt, timeout, on_onboard))
return asyncio.run(self._stream_input(prompt, timeout, on_onboard, images, files))

async def input_async(
self,
prompt: str,
timeout: float = 60.0,
on_onboard: Optional[Callable[[List[str], Optional[float]], Dict[str, Any]]] = None
on_onboard: Optional[Callable[[List[str], Optional[float]], Dict[str, Any]]] = None,
images: Optional[List[str]] = None,
files: Optional[List[Dict[str, Any]]] = None,
) -> Response:
"""Async version of input()."""
return await self._stream_input(prompt, timeout, on_onboard)
return await self._stream_input(prompt, timeout, on_onboard, images, files)

def reset(self) -> None:
"""Clear conversation and start fresh."""
Expand All @@ -257,7 +263,9 @@ async def _stream_input(
self,
prompt: str,
timeout: float,
on_onboard: Optional[Callable[[List[str], Optional[float]], Dict[str, Any]]] = None
on_onboard: Optional[Callable[[List[str], Optional[float]], Dict[str, Any]]] = None,
images: Optional[List[str]] = None,
files: Optional[List[Dict[str, Any]]] = None,
) -> Response:
"""Send prompt via WebSocket and stream events."""
import websockets
Expand Down Expand Up @@ -285,7 +293,7 @@ async def _stream_input(
input_id = str(uuid.uuid4())

# Build the INPUT message
input_msg = self._build_input_message(prompt, input_id, is_direct)
input_msg = self._build_input_message(prompt, input_id, is_direct, images, files)

try:
async with websockets.connect(ws_url) as ws:
Expand Down Expand Up @@ -379,7 +387,14 @@ async def _stream_input(
self._status = "idle"
raise TimeoutError(f"Request timed out after {timeout}s")

def _build_input_message(self, prompt: str, input_id: str, is_direct: bool = False) -> Dict[str, Any]:
def _build_input_message(
self,
prompt: str,
input_id: str,
is_direct: bool = False,
images: Optional[List[str]] = None,
files: Optional[List[Dict[str, Any]]] = None,
) -> Dict[str, Any]:
"""Build INPUT message with optional signing."""
input_msg: Dict[str, Any] = {
"type": "INPUT",
Expand All @@ -396,6 +411,12 @@ def _build_input_message(self, prompt: str, input_id: str, is_direct: bool = Fal
if self._current_session:
input_msg["session"] = self._current_session

# Add multimodal attachments
if images:
input_msg["images"] = images
if files:
input_msg["files"] = files

# Sign if keys provided
if self._keys:
payload: Dict[str, Any] = {"prompt": prompt, "timestamp": input_msg["timestamp"]}
Expand Down
23 changes: 19 additions & 4 deletions connectonion/network/host/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@


def input_handler(create_agent: Callable, storage: SessionStorage, prompt: str, result_ttl: int,
session: dict | None = None, connection=None, images: list[str] | None = None) -> dict:
session: dict | None = None, connection=None, images: list[str] | None = None,
files: list[dict] | None = None) -> dict:
"""POST /input (and WebSocket /ws)

Args:
Expand All @@ -30,6 +31,7 @@ def input_handler(create_agent: Callable, storage: SessionStorage, prompt: str,
session: Optional conversation session for continuation
connection: WebSocket connection for bidirectional I/O (None for HTTP)
images: Optional list of base64 data URLs for multimodal input
files: Optional list of file dicts with name and base64 data
"""
agent = create_agent() # Fresh instance per request
agent.io = connection # WebSocket connection or None for HTTP
Expand All @@ -55,7 +57,7 @@ def input_handler(create_agent: Callable, storage: SessionStorage, prompt: str,
# TODO: If agent.input() throws, record stays "running" until TTL expires.
# This is acceptable: client gets 500 error, record expires naturally.
start = time.time()
result = agent.input(prompt, session=session, images=images)
result = agent.input(prompt, session=session, images=images, files=files)
duration_ms = int((time.time() - start) * 1000)

record.status = "done"
Expand Down Expand Up @@ -89,17 +91,22 @@ def health_handler(agent_name: str, start_time: float) -> dict:
return {"status": "healthy", "agent": agent_name, "uptime": int(time.time() - start_time)}


def info_handler(agent_metadata: dict, trust, trust_config: dict | None = None) -> dict:
def info_handler(agent_metadata: dict, trust, trust_config: dict | None = None,
host_config: dict | None = None) -> dict:
"""GET /info

Returns pre-extracted metadata including onboard requirements.
Returns pre-extracted metadata including onboard requirements and accepted inputs.

Args:
agent_metadata: Agent name, address, tools
trust: TrustAgent instance (trust level extracted via .trust attribute)
trust_config: Parsed YAML config from trust policy (optional)
host_config: Host config dict with file upload limits (optional)
"""
from ... import __version__
from .config import DEFAULT_FILE_LIMITS

file_config = host_config or DEFAULT_FILE_LIMITS

result = {
"name": agent_metadata["name"],
Expand All @@ -108,6 +115,14 @@ def info_handler(agent_metadata: dict, trust, trust_config: dict | None = None)
"model": agent_metadata.get("model", "unknown"), # Add model info
"trust": trust.trust, # Extract level string from TrustAgent
"version": __version__,
"accepted_inputs": {
"text": True,
"images": True,
"files": {
"max_file_size_mb": file_config.get("max_file_size", DEFAULT_FILE_LIMITS["max_file_size"]),
"max_files_per_request": file_config.get("max_files_per_request", DEFAULT_FILE_LIMITS["max_files_per_request"]),
},
},
}

# Add onboard info if available
Expand Down
22 changes: 14 additions & 8 deletions connectonion/network/host/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def _extract_agent_metadata(create_agent: Callable) -> tuple[dict, object]:
return metadata, sample


def _create_route_handlers(create_agent: Callable, agent_metadata: dict, result_ttl: int, trust_agent):
def _create_route_handlers(create_agent: Callable, agent_metadata: dict, result_ttl: int, trust_agent, config: dict):
"""Create route handler dict for ASGI app.

Args:
Expand All @@ -121,20 +121,25 @@ def _create_route_handlers(create_agent: Callable, agent_metadata: dict, result_
creating agents for health/info endpoints.
result_ttl: How long to keep results on server in seconds
trust_agent: TrustAgent instance for trust operations
config: Host config dict (includes file upload limits)
"""
from .config import validate_files

agent_name = agent_metadata["name"]

def handle_input(storage, prompt, session=None, connection=None, images=None):
return input_handler(create_agent, storage, prompt, result_ttl, session, connection, images)
def handle_input(storage, prompt, session=None, connection=None, images=None, files=None):
validate_files(files, config)
return input_handler(create_agent, storage, prompt, result_ttl, session, connection, images, files)

def handle_ws_input(storage, prompt, connection, session=None, images=None):
return input_handler(create_agent, storage, prompt, result_ttl, session, connection, images)
def handle_ws_input(storage, prompt, connection, session=None, images=None, files=None):
validate_files(files, config)
return input_handler(create_agent, storage, prompt, result_ttl, session, connection, images, files)

def handle_health(start_time):
return health_handler(agent_name, start_time)

def handle_info(trust, trust_config=None):
return info_handler(agent_metadata, trust, trust_config)
return info_handler(agent_metadata, trust, trust_config, config)

def handle_admin_logs():
return admin_logs_handler(agent_name)
Expand Down Expand Up @@ -417,7 +422,7 @@ def create_agent():
else:
trust_agent = TrustAgent(trust if isinstance(trust, str) else "careful")

route_handlers = _create_route_handlers(create_agent, agent_metadata, result_ttl, trust_agent)
route_handlers = _create_route_handlers(create_agent, agent_metadata, result_ttl, trust_agent, config)

# Parse trust config for /info onboard info
trust_config = _parse_trust_config(trust)
Expand Down Expand Up @@ -482,7 +487,8 @@ def create_agent():
else:
trust_agent = TrustAgent(trust if isinstance(trust, str) else "careful")

route_handlers = _create_route_handlers(create_agent, agent_metadata, result_ttl, trust_agent)
from .config import DEFAULT_FILE_LIMITS
route_handlers = _create_route_handlers(create_agent, agent_metadata, result_ttl, trust_agent, DEFAULT_FILE_LIMITS)
return asgi_create_app(
route_handlers=route_handlers,
storage=storage,
Expand Down
Loading
Loading