Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions extensions/metaclaw-openclaw/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,11 @@ function patchFetchForTrainingHeaders(
api.on("before_prompt_build", (_event, ctx) => {
const sessionId = ctx.sessionId ?? "";
const turnType = SIDE_TRIGGERS.has(ctx.trigger ?? "") ? "side" : "main";
const agentId = ctx.agentId ?? "";
pendingHeaders = {
[config.sessionIdHeader]: sessionId,
[config.turnTypeHeader]: turnType,
"X-Agent-Id": agentId,
};
return {};
});
Expand Down
154 changes: 129 additions & 25 deletions metaclaw/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,11 @@ def __init__(
# Buffer turns per session for memory ingestion (only cleared on session_done)
self._session_memory_turns: dict[str, list] = {}
self._session_memory_scopes: dict[str, str] = {}
# Track agent_id per session for evolution routing
self._session_agent_ids: dict[str, str] = {}

# Per-agent SkillManager cache: agent_id → SkillManager
self._agent_skill_managers: dict[str, SkillManager] = {}

# OPD teacher model client
self._teacher_client: Optional[OpenAI] = None
Expand Down Expand Up @@ -622,6 +627,7 @@ async def chat_completions(
x_memory_scope: Optional[str] = Header(default=None),
x_user_id: Optional[str] = Header(default=None),
x_workspace_id: Optional[str] = Header(default=None),
x_agent_id: Optional[str] = Header(default=None),
):
owner: MetaClawAPIServer = request.app.state.owner
# Update idle tracker so the scheduler knows the user is active
Expand Down Expand Up @@ -680,13 +686,16 @@ async def chat_completions(
workspace_id=_explicit_workspace,
)

agent_id = (x_agent_id or "").strip() or "_default"

stream = bool(body.get("stream", False))
result = await owner._handle_request(
body,
session_id=session_id,
turn_type=turn_type,
session_done=session_done,
memory_scope=memory_scope,
agent_id=agent_id,
)
if stream:
return StreamingResponse(
Expand Down Expand Up @@ -1144,6 +1153,7 @@ async def _handle_request(
turn_type: str,
session_done: bool,
memory_scope: str = "",
agent_id: str = "_default",
) -> dict[str, Any]:
messages = body.get("messages")
if not isinstance(messages, list) or not messages:
Expand Down Expand Up @@ -1173,7 +1183,10 @@ def _prompt_len(msgs):
cached_system = ""
# NOTE: In skills_only mode we forward directly to the user's LLM provider.
# Do not rewrite/collapse the system prompt here.
if self.config.mode != "skills_only":
# Resolve per-agent mode for system prompt handling
_agent_cfg_sp = (getattr(self.config, "agents", None) or {}).get(agent_id, {})
_effective_mode_sp = _agent_cfg_sp.get("mode", self.config.mode) if isinstance(_agent_cfg_sp, dict) else self.config.mode
if _effective_mode_sp != "skills_only":
cached_system = self._read_cached_system_prompt()
if not cached_system:
raw_system = ""
Expand All @@ -1200,20 +1213,23 @@ def _prompt_len(msgs):
if effective_memory_scope:
self._session_memory_scopes[session_id] = effective_memory_scope

# Resolve per-agent skill manager
agent_sm = self._get_agent_skill_manager(agent_id)

# Inject memory and skills into system message for main turns
if turn_type == "main":
if (
self.memory_manager
and self.skill_manager
and agent_sm
and self.config.synergy_enabled
):
messages = await self._inject_augmentation(
messages, scope_id=effective_memory_scope,
messages, scope_id=effective_memory_scope, skill_manager=agent_sm,
)
elif self.memory_manager:
messages = await self._inject_memory(messages, scope_id=effective_memory_scope)
elif self.skill_manager:
messages = self._inject_skills(messages)
elif agent_sm:
messages = self._inject_skills(messages, skill_manager=agent_sm)
if cached_system:
logger.info(
"[OpenClaw] system prompt cached len=%d",
Expand All @@ -1234,7 +1250,11 @@ def _prompt_len(msgs):
forward_body["model"] = self._served_model
forward_body["messages"] = _ensure_reasoning_content(messages)

if self.config.mode == "skills_only":
# Resolve per-agent mode: check agents config, fall back to global mode
_agent_cfg = (getattr(self.config, "agents", None) or {}).get(agent_id, {})
_effective_mode = _agent_cfg.get("mode", self.config.mode) if isinstance(_agent_cfg, dict) else self.config.mode

if _effective_mode == "skills_only":
output = await self._forward_to_llm(forward_body)
else:
output = await self._forward_to_tinker(forward_body)
Expand Down Expand Up @@ -1287,6 +1307,9 @@ def _prompt_len(msgs):
"prompt_text": prompt_text_simple,
"response_text": response_text_simple,
}
# Track agent_id for this session
self._session_agent_ids[session_id] = agent_id

evolution_every_n = getattr(self.config, "skill_evolution_every_n_turns", 10)
_want_evolution = self.skill_evolver and self.config.enable_skill_evolution and evolution_every_n > 0
_want_memory = self.memory_manager is not None
Expand All @@ -1296,7 +1319,7 @@ def _prompt_len(msgs):
if len(buf) >= evolution_every_n:
evolution_turns = list(buf)
self._session_turns[session_id] = []
self._safe_create_task(self._evolve_skills_for_session(evolution_turns))
self._safe_create_task(self._evolve_skills_for_session(evolution_turns, agent_id=agent_id))
if _want_memory:
self._session_memory_turns.setdefault(session_id, []).append(turn_entry)
# session_done handling for skills_only path (tokenizer unavailable)
Expand All @@ -1306,7 +1329,8 @@ def _prompt_len(msgs):
eff = self._session_effective.pop(session_id, 0)
self._turn_counts.pop(session_id, None)
self._teacher_tasks.pop(session_id, None)
logger.info("[OpenClaw] session=%s done → cleaned up (effective_samples=%d)", session_id, eff)
_done_agent_id = self._session_agent_ids.pop(session_id, agent_id)
logger.info("[OpenClaw] session=%s agent=%s done → cleaned up (effective_samples=%d)", session_id, _done_agent_id, eff)
memory_turns = self._session_memory_turns.pop(session_id, [])
if memory_turns and self.memory_manager is not None and self.config.memory_auto_extract:
self._safe_create_task(
Expand All @@ -1320,7 +1344,7 @@ def _prompt_len(msgs):
self._session_memory_scopes.pop(session_id, None)
evolution_turns = self._session_turns.pop(session_id, [])
if evolution_turns and self.skill_evolver and self.config.enable_skill_evolution:
self._safe_create_task(self._evolve_skills_for_session(evolution_turns))
self._safe_create_task(self._evolve_skills_for_session(evolution_turns, agent_id=_done_agent_id))
elif session_done and self.config.memory_manual_trigger:
self._flush_pending_record(session_id, None)
self._maybe_submit_ready_samples(session_id, force_no_prm=True)
Expand Down Expand Up @@ -1375,6 +1399,9 @@ def _prompt_len(msgs):
session_id, turn_num, len(prompt_ids), len(response_ids),
)
self._buffer_record(session_id, turn_num, messages, prompt_text, response_text, tool_calls)
# Track agent_id for this session (RL path)
self._session_agent_ids[session_id] = agent_id

# Skill evolution + memory: buffer turns for all modes (RL and skills_only).
turn_entry = {"prompt_text": prompt_text, "response_text": response_text}
evolution_every_n = getattr(self.config, "skill_evolution_every_n_turns", 10)
Expand All @@ -1386,7 +1413,7 @@ def _prompt_len(msgs):
if len(buf) >= evolution_every_n:
evolution_turns = list(buf)
self._session_turns[session_id] = []
self._safe_create_task(self._evolve_skills_for_session(evolution_turns))
self._safe_create_task(self._evolve_skills_for_session(evolution_turns, agent_id=agent_id))
if _want_memory:
self._session_memory_turns.setdefault(session_id, []).append(turn_entry)
self._pending_turn_data.setdefault(session_id, {})[turn_num] = turn_data
Expand Down Expand Up @@ -1417,7 +1444,8 @@ def _prompt_len(msgs):
eff = self._session_effective.pop(session_id, 0)
self._turn_counts.pop(session_id, None)
self._teacher_tasks.pop(session_id, None)
logger.info("[OpenClaw] session=%s done → cleaned up (effective_samples=%d)", session_id, eff)
_done_agent_id = self._session_agent_ids.pop(session_id, agent_id)
logger.info("[OpenClaw] session=%s agent=%s done → cleaned up (effective_samples=%d)", session_id, _done_agent_id, eff)
# session done: trigger memory ingestion from dedicated memory buffer
memory_turns = self._session_memory_turns.pop(session_id, [])
if memory_turns and self.memory_manager is not None and self.config.memory_auto_extract:
Expand All @@ -1433,7 +1461,7 @@ def _prompt_len(msgs):
# session done: trigger skill evolution from skill buffer
evolution_turns = self._session_turns.pop(session_id, [])
if evolution_turns and self.skill_evolver and self.config.enable_skill_evolution:
self._safe_create_task(self._evolve_skills_for_session(evolution_turns))
self._safe_create_task(self._evolve_skills_for_session(evolution_turns, agent_id=_done_agent_id))
elif session_done and self.config.memory_manual_trigger:
# manual_trigger mode: clean up non-memory state, keep memory buffer for manual ingest
self._flush_pending_record(session_id, None)
Expand Down Expand Up @@ -1626,10 +1654,12 @@ async def _forward_to_llm(self, body: dict[str, Any]) -> dict[str, Any]:
# Skill evolution (skills_only mode) #
# ------------------------------------------------------------------ #

async def _evolve_skills_for_session(self, turns: list[dict]):
async def _evolve_skills_for_session(self, turns: list[dict], agent_id: str = "_default"):
"""Analyze session turns and generate new skills (skills_only mode)."""
from types import SimpleNamespace

agent_sm = self._get_agent_skill_manager(agent_id)

samples = [
SimpleNamespace(
prompt_text=t["prompt_text"],
Expand All @@ -1638,18 +1668,18 @@ async def _evolve_skills_for_session(self, turns: list[dict]):
)
for t in turns
]
existing = self.skill_manager.skills if self.skill_manager else {}
existing = agent_sm.skills if agent_sm else {}
try:
new_skills = await self.skill_evolver.evolve(samples, existing)
except Exception as e:
logger.error("[SkillEvolver] evolve failed: %s", e, exc_info=True)
return
if new_skills and self.skill_manager:
if new_skills and agent_sm:
added = 0
for skill in new_skills:
category = skill.get("category", "general")
added += self.skill_manager.add_skills([skill], category=category)
logger.info("[SkillEvolver] session analysis added %d new skills", added)
added += agent_sm.add_skills([skill], category=category)
logger.info("[SkillEvolver] agent=%s session analysis added %d new skills", agent_id, added)

# ------------------------------------------------------------------ #
# Skill injection #
Expand Down Expand Up @@ -1711,17 +1741,89 @@ def _prompt_len(msgs):
)
return result

def _inject_skills(self, messages: list[dict]) -> list[dict]:
"""Prepend skill guidance to the system message."""
def _get_agent_skill_manager(self, agent_id: str) -> Optional[SkillManager]:
"""Get or create a per-agent SkillManager.

Loads skills from both ``_shared/`` and ``{agent_id}/`` subdirectories
under the configured skills_dir. Falls back to the global skill_manager
if per-agent isolation is not configured.
"""
if not self.skill_manager:
return None

# If no agents config or agent_id is _default, use global manager
agents_config = getattr(self.config, "agents", {}) or {}
if not agents_config and agent_id == "_default":
return self.skill_manager

# Check cache
if agent_id in self._agent_skill_managers:
return self._agent_skill_managers[agent_id]

# Build per-agent skill directory
base_skills_dir = self.config.skills_dir
agent_skills_dir = os.path.join(base_skills_dir, agent_id)
shared_skills_dir = os.path.join(base_skills_dir, "_shared")

# Ensure directories exist
os.makedirs(agent_skills_dir, exist_ok=True)
os.makedirs(shared_skills_dir, exist_ok=True)

# Create a per-agent SkillManager from the agent's directory
try:
agent_sm = SkillManager(
skills_dir=agent_skills_dir,
retrieval_mode=self.skill_manager.retrieval_mode,
embedding_model_path=self.skill_manager.embedding_model_path,
task_specific_top_k=self.skill_manager.task_specific_top_k,
)
except FileNotFoundError:
agent_sm = None

# Also load shared skills and merge them in
try:
shared_sm = SkillManager(
skills_dir=shared_skills_dir,
retrieval_mode=self.skill_manager.retrieval_mode,
embedding_model_path=self.skill_manager.embedding_model_path,
task_specific_top_k=self.skill_manager.task_specific_top_k,
)
# Merge shared skills into agent's skill manager
if agent_sm:
for skill in shared_sm.skills.get("general_skills", []):
agent_sm.add_skill(skill)
for cat, skills in shared_sm.skills.get("task_specific_skills", {}).items():
for skill in skills:
agent_sm.add_skill(skill)
for skill in shared_sm.skills.get("common_mistakes", []):
agent_sm.add_skill(skill)
else:
agent_sm = shared_sm
except FileNotFoundError:
pass

if agent_sm is None:
agent_sm = self.skill_manager # ultimate fallback

self._agent_skill_managers[agent_id] = agent_sm
logger.info(
"[SkillManager] created per-agent manager for '%s': %s",
agent_id, agent_sm.get_skill_count(),
)
return agent_sm

def _inject_skills(self, messages: list[dict], skill_manager: Optional[SkillManager] = None) -> list[dict]:
"""Prepend skill guidance to the system message."""
sm = skill_manager or self.skill_manager
if not sm:
return messages

user_msgs = [m for m in messages if m.get("role") == "user"]
task_desc = _flatten_message_content(user_msgs[-1].get("content", "")) if user_msgs else ""
if not task_desc:
return messages

skills = self.skill_manager.retrieve(task_desc, top_k=self.config.skill_top_k)
skills = sm.retrieve(task_desc, top_k=self.config.skill_top_k)
if not skills:
return messages

Expand All @@ -1736,7 +1838,7 @@ def _inject_skills(self, messages: list[dict]) -> list[dict]:
", ".join(skill_names)[:400],
)

skill_text = self.skill_manager.format_for_conversation(skills)
skill_text = sm.format_for_conversation(skills)
messages = list(messages)

sys_indices = [i for i, m in enumerate(messages) if m.get("role") == "system"]
Expand Down Expand Up @@ -2112,6 +2214,7 @@ async def _inject_augmentation(
self,
messages: list[dict],
scope_id: str = "",
skill_manager: Optional[SkillManager] = None,
) -> list[dict]:
"""Coordinated injection of both Memory and Skill.

Expand All @@ -2122,7 +2225,8 @@ async def _inject_augmentation(
2. Content dedup — procedural observations that overlap with skills are dropped
3. Role-separated prompt template — LLM gets clear guidance on how to use each
"""
if not self.memory_manager or not self.skill_manager:
sm = skill_manager or self.skill_manager
if not self.memory_manager or not sm:
return messages

user_msgs = [m for m in messages if m.get("role") == "user"]
Expand All @@ -2135,7 +2239,7 @@ async def _inject_augmentation(
return messages

# --- 1. Retrieve relevant skills (for template customization, not injection)
skills = self.skill_manager.retrieve_relevant(
skills = sm.retrieve_relevant(
task_desc, top_k=min(self.config.skill_top_k, 5),
)

Expand All @@ -2152,7 +2256,7 @@ async def _inject_augmentation(
)

if not memories:
return self._inject_skills(messages)
return self._inject_skills(messages, skill_manager=sm)

# Dedup procedural memories that overlap with matched skills.
memories = self._dedup_memory_against_skills(
Expand All @@ -2161,7 +2265,7 @@ async def _inject_augmentation(

memory_text = self.memory_manager.render_for_prompt(memories)
if not memory_text:
return self._inject_skills(messages)
return self._inject_skills(messages, skill_manager=sm)

# --- 3. Build skill-aware structured template (no full skill injection) ---
# Extract compact process steps from matched skills.
Expand Down
14 changes: 14 additions & 0 deletions metaclaw/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,20 @@ class MetaClawConfig:
bedrock_region: str = "us-east-1"
skill_evolution_history_path: str = "memory_data/skills/evolution_history.jsonl"

# ------------------------------------------------------------------ #
# Per-Agent Isolation #
# ------------------------------------------------------------------ #
# Map agent_id → per-agent overrides. Keys: mode, skills_dir, lora_output.
# Missing agents fall back to top-level config.
# Example YAML:
# agents:
# doctor:
# mode: rl
# lora_output: ~/.metaclaw/lora/doctor/
# main:
# mode: skills_only
agents: dict = field(default_factory=dict)

# ------------------------------------------------------------------ #
# WeChat (official openclaw-weixin plugin, auto-installed) #
# ------------------------------------------------------------------ #
Expand Down