diff --git a/extensions/metaclaw-openclaw/index.ts b/extensions/metaclaw-openclaw/index.ts index f4cdfff7..edb24652 100644 --- a/extensions/metaclaw-openclaw/index.ts +++ b/extensions/metaclaw-openclaw/index.ts @@ -152,9 +152,11 @@ function patchFetchForTrainingHeaders( api.on("before_prompt_build", (_event, ctx) => { const sessionId = ctx.sessionId ?? ""; const turnType = SIDE_TRIGGERS.has(ctx.trigger ?? "") ? "side" : "main"; + const agentId = ctx.agentId ?? ""; pendingHeaders = { [config.sessionIdHeader]: sessionId, [config.turnTypeHeader]: turnType, + "X-Agent-Id": agentId, }; return {}; }); diff --git a/metaclaw/api_server.py b/metaclaw/api_server.py index 8443d5ba..71183745 100644 --- a/metaclaw/api_server.py +++ b/metaclaw/api_server.py @@ -540,6 +540,11 @@ def __init__( # Buffer turns per session for memory ingestion (only cleared on session_done) self._session_memory_turns: dict[str, list] = {} self._session_memory_scopes: dict[str, str] = {} + # Track agent_id per session for evolution routing + self._session_agent_ids: dict[str, str] = {} + + # Per-agent SkillManager cache: agent_id → SkillManager + self._agent_skill_managers: dict[str, SkillManager] = {} # OPD teacher model client self._teacher_client: Optional[OpenAI] = None @@ -622,6 +627,7 @@ async def chat_completions( x_memory_scope: Optional[str] = Header(default=None), x_user_id: Optional[str] = Header(default=None), x_workspace_id: Optional[str] = Header(default=None), + x_agent_id: Optional[str] = Header(default=None), ): owner: MetaClawAPIServer = request.app.state.owner # Update idle tracker so the scheduler knows the user is active @@ -680,6 +686,8 @@ async def chat_completions( workspace_id=_explicit_workspace, ) + agent_id = (x_agent_id or "").strip() or "_default" + stream = bool(body.get("stream", False)) result = await owner._handle_request( body, @@ -687,6 +695,7 @@ async def chat_completions( turn_type=turn_type, session_done=session_done, memory_scope=memory_scope, + agent_id=agent_id, ) if stream: return StreamingResponse( @@ -1144,6 +1153,7 @@ async def _handle_request( turn_type: str, session_done: bool, memory_scope: str = "", + agent_id: str = "_default", ) -> dict[str, Any]: messages = body.get("messages") if not isinstance(messages, list) or not messages: @@ -1173,7 +1183,10 @@ def _prompt_len(msgs): cached_system = "" # NOTE: In skills_only mode we forward directly to the user's LLM provider. # Do not rewrite/collapse the system prompt here. - if self.config.mode != "skills_only": + # Resolve per-agent mode for system prompt handling + _agent_cfg_sp = (getattr(self.config, "agents", None) or {}).get(agent_id, {}) + _effective_mode_sp = _agent_cfg_sp.get("mode", self.config.mode) if isinstance(_agent_cfg_sp, dict) else self.config.mode + if _effective_mode_sp != "skills_only": cached_system = self._read_cached_system_prompt() if not cached_system: raw_system = "" @@ -1200,20 +1213,23 @@ def _prompt_len(msgs): if effective_memory_scope: self._session_memory_scopes[session_id] = effective_memory_scope + # Resolve per-agent skill manager + agent_sm = self._get_agent_skill_manager(agent_id) + # Inject memory and skills into system message for main turns if turn_type == "main": if ( self.memory_manager - and self.skill_manager + and agent_sm and self.config.synergy_enabled ): messages = await self._inject_augmentation( - messages, scope_id=effective_memory_scope, + messages, scope_id=effective_memory_scope, skill_manager=agent_sm, ) elif self.memory_manager: messages = await self._inject_memory(messages, scope_id=effective_memory_scope) - elif self.skill_manager: - messages = self._inject_skills(messages) + elif agent_sm: + messages = self._inject_skills(messages, skill_manager=agent_sm) if cached_system: logger.info( "[OpenClaw] system prompt cached len=%d", @@ -1234,7 +1250,11 @@ def _prompt_len(msgs): forward_body["model"] = self._served_model forward_body["messages"] = _ensure_reasoning_content(messages) - if self.config.mode == "skills_only": + # Resolve per-agent mode: check agents config, fall back to global mode + _agent_cfg = (getattr(self.config, "agents", None) or {}).get(agent_id, {}) + _effective_mode = _agent_cfg.get("mode", self.config.mode) if isinstance(_agent_cfg, dict) else self.config.mode + + if _effective_mode == "skills_only": output = await self._forward_to_llm(forward_body) else: output = await self._forward_to_tinker(forward_body) @@ -1287,6 +1307,9 @@ def _prompt_len(msgs): "prompt_text": prompt_text_simple, "response_text": response_text_simple, } + # Track agent_id for this session + self._session_agent_ids[session_id] = agent_id + evolution_every_n = getattr(self.config, "skill_evolution_every_n_turns", 10) _want_evolution = self.skill_evolver and self.config.enable_skill_evolution and evolution_every_n > 0 _want_memory = self.memory_manager is not None @@ -1296,7 +1319,7 @@ def _prompt_len(msgs): if len(buf) >= evolution_every_n: evolution_turns = list(buf) self._session_turns[session_id] = [] - self._safe_create_task(self._evolve_skills_for_session(evolution_turns)) + self._safe_create_task(self._evolve_skills_for_session(evolution_turns, agent_id=agent_id)) if _want_memory: self._session_memory_turns.setdefault(session_id, []).append(turn_entry) # session_done handling for skills_only path (tokenizer unavailable) @@ -1306,7 +1329,8 @@ def _prompt_len(msgs): eff = self._session_effective.pop(session_id, 0) self._turn_counts.pop(session_id, None) self._teacher_tasks.pop(session_id, None) - logger.info("[OpenClaw] session=%s done → cleaned up (effective_samples=%d)", session_id, eff) + _done_agent_id = self._session_agent_ids.pop(session_id, agent_id) + logger.info("[OpenClaw] session=%s agent=%s done → cleaned up (effective_samples=%d)", session_id, _done_agent_id, eff) memory_turns = self._session_memory_turns.pop(session_id, []) if memory_turns and self.memory_manager is not None and self.config.memory_auto_extract: self._safe_create_task( @@ -1320,7 +1344,7 @@ def _prompt_len(msgs): self._session_memory_scopes.pop(session_id, None) evolution_turns = self._session_turns.pop(session_id, []) if evolution_turns and self.skill_evolver and self.config.enable_skill_evolution: - self._safe_create_task(self._evolve_skills_for_session(evolution_turns)) + self._safe_create_task(self._evolve_skills_for_session(evolution_turns, agent_id=_done_agent_id)) elif session_done and self.config.memory_manual_trigger: self._flush_pending_record(session_id, None) self._maybe_submit_ready_samples(session_id, force_no_prm=True) @@ -1375,6 +1399,9 @@ def _prompt_len(msgs): session_id, turn_num, len(prompt_ids), len(response_ids), ) self._buffer_record(session_id, turn_num, messages, prompt_text, response_text, tool_calls) + # Track agent_id for this session (RL path) + self._session_agent_ids[session_id] = agent_id + # Skill evolution + memory: buffer turns for all modes (RL and skills_only). turn_entry = {"prompt_text": prompt_text, "response_text": response_text} evolution_every_n = getattr(self.config, "skill_evolution_every_n_turns", 10) @@ -1386,7 +1413,7 @@ def _prompt_len(msgs): if len(buf) >= evolution_every_n: evolution_turns = list(buf) self._session_turns[session_id] = [] - self._safe_create_task(self._evolve_skills_for_session(evolution_turns)) + self._safe_create_task(self._evolve_skills_for_session(evolution_turns, agent_id=agent_id)) if _want_memory: self._session_memory_turns.setdefault(session_id, []).append(turn_entry) self._pending_turn_data.setdefault(session_id, {})[turn_num] = turn_data @@ -1417,7 +1444,8 @@ def _prompt_len(msgs): eff = self._session_effective.pop(session_id, 0) self._turn_counts.pop(session_id, None) self._teacher_tasks.pop(session_id, None) - logger.info("[OpenClaw] session=%s done → cleaned up (effective_samples=%d)", session_id, eff) + _done_agent_id = self._session_agent_ids.pop(session_id, agent_id) + logger.info("[OpenClaw] session=%s agent=%s done → cleaned up (effective_samples=%d)", session_id, _done_agent_id, eff) # session done: trigger memory ingestion from dedicated memory buffer memory_turns = self._session_memory_turns.pop(session_id, []) if memory_turns and self.memory_manager is not None and self.config.memory_auto_extract: @@ -1433,7 +1461,7 @@ def _prompt_len(msgs): # session done: trigger skill evolution from skill buffer evolution_turns = self._session_turns.pop(session_id, []) if evolution_turns and self.skill_evolver and self.config.enable_skill_evolution: - self._safe_create_task(self._evolve_skills_for_session(evolution_turns)) + self._safe_create_task(self._evolve_skills_for_session(evolution_turns, agent_id=_done_agent_id)) elif session_done and self.config.memory_manual_trigger: # manual_trigger mode: clean up non-memory state, keep memory buffer for manual ingest self._flush_pending_record(session_id, None) @@ -1626,10 +1654,12 @@ async def _forward_to_llm(self, body: dict[str, Any]) -> dict[str, Any]: # Skill evolution (skills_only mode) # # ------------------------------------------------------------------ # - async def _evolve_skills_for_session(self, turns: list[dict]): + async def _evolve_skills_for_session(self, turns: list[dict], agent_id: str = "_default"): """Analyze session turns and generate new skills (skills_only mode).""" from types import SimpleNamespace + agent_sm = self._get_agent_skill_manager(agent_id) + samples = [ SimpleNamespace( prompt_text=t["prompt_text"], @@ -1638,18 +1668,18 @@ async def _evolve_skills_for_session(self, turns: list[dict]): ) for t in turns ] - existing = self.skill_manager.skills if self.skill_manager else {} + existing = agent_sm.skills if agent_sm else {} try: new_skills = await self.skill_evolver.evolve(samples, existing) except Exception as e: logger.error("[SkillEvolver] evolve failed: %s", e, exc_info=True) return - if new_skills and self.skill_manager: + if new_skills and agent_sm: added = 0 for skill in new_skills: category = skill.get("category", "general") - added += self.skill_manager.add_skills([skill], category=category) - logger.info("[SkillEvolver] session analysis added %d new skills", added) + added += agent_sm.add_skills([skill], category=category) + logger.info("[SkillEvolver] agent=%s session analysis added %d new skills", agent_id, added) # ------------------------------------------------------------------ # # Skill injection # @@ -1711,9 +1741,81 @@ def _prompt_len(msgs): ) return result - def _inject_skills(self, messages: list[dict]) -> list[dict]: - """Prepend skill guidance to the system message.""" + def _get_agent_skill_manager(self, agent_id: str) -> Optional[SkillManager]: + """Get or create a per-agent SkillManager. + + Loads skills from both ``_shared/`` and ``{agent_id}/`` subdirectories + under the configured skills_dir. Falls back to the global skill_manager + if per-agent isolation is not configured. + """ if not self.skill_manager: + return None + + # If no agents config or agent_id is _default, use global manager + agents_config = getattr(self.config, "agents", {}) or {} + if not agents_config and agent_id == "_default": + return self.skill_manager + + # Check cache + if agent_id in self._agent_skill_managers: + return self._agent_skill_managers[agent_id] + + # Build per-agent skill directory + base_skills_dir = self.config.skills_dir + agent_skills_dir = os.path.join(base_skills_dir, agent_id) + shared_skills_dir = os.path.join(base_skills_dir, "_shared") + + # Ensure directories exist + os.makedirs(agent_skills_dir, exist_ok=True) + os.makedirs(shared_skills_dir, exist_ok=True) + + # Create a per-agent SkillManager from the agent's directory + try: + agent_sm = SkillManager( + skills_dir=agent_skills_dir, + retrieval_mode=self.skill_manager.retrieval_mode, + embedding_model_path=self.skill_manager.embedding_model_path, + task_specific_top_k=self.skill_manager.task_specific_top_k, + ) + except FileNotFoundError: + agent_sm = None + + # Also load shared skills and merge them in + try: + shared_sm = SkillManager( + skills_dir=shared_skills_dir, + retrieval_mode=self.skill_manager.retrieval_mode, + embedding_model_path=self.skill_manager.embedding_model_path, + task_specific_top_k=self.skill_manager.task_specific_top_k, + ) + # Merge shared skills into agent's skill manager + if agent_sm: + for skill in shared_sm.skills.get("general_skills", []): + agent_sm.add_skill(skill) + for cat, skills in shared_sm.skills.get("task_specific_skills", {}).items(): + for skill in skills: + agent_sm.add_skill(skill) + for skill in shared_sm.skills.get("common_mistakes", []): + agent_sm.add_skill(skill) + else: + agent_sm = shared_sm + except FileNotFoundError: + pass + + if agent_sm is None: + agent_sm = self.skill_manager # ultimate fallback + + self._agent_skill_managers[agent_id] = agent_sm + logger.info( + "[SkillManager] created per-agent manager for '%s': %s", + agent_id, agent_sm.get_skill_count(), + ) + return agent_sm + + def _inject_skills(self, messages: list[dict], skill_manager: Optional[SkillManager] = None) -> list[dict]: + """Prepend skill guidance to the system message.""" + sm = skill_manager or self.skill_manager + if not sm: return messages user_msgs = [m for m in messages if m.get("role") == "user"] @@ -1721,7 +1823,7 @@ def _inject_skills(self, messages: list[dict]) -> list[dict]: if not task_desc: return messages - skills = self.skill_manager.retrieve(task_desc, top_k=self.config.skill_top_k) + skills = sm.retrieve(task_desc, top_k=self.config.skill_top_k) if not skills: return messages @@ -1736,7 +1838,7 @@ def _inject_skills(self, messages: list[dict]) -> list[dict]: ", ".join(skill_names)[:400], ) - skill_text = self.skill_manager.format_for_conversation(skills) + skill_text = sm.format_for_conversation(skills) messages = list(messages) sys_indices = [i for i, m in enumerate(messages) if m.get("role") == "system"] @@ -2112,6 +2214,7 @@ async def _inject_augmentation( self, messages: list[dict], scope_id: str = "", + skill_manager: Optional[SkillManager] = None, ) -> list[dict]: """Coordinated injection of both Memory and Skill. @@ -2122,7 +2225,8 @@ async def _inject_augmentation( 2. Content dedup — procedural observations that overlap with skills are dropped 3. Role-separated prompt template — LLM gets clear guidance on how to use each """ - if not self.memory_manager or not self.skill_manager: + sm = skill_manager or self.skill_manager + if not self.memory_manager or not sm: return messages user_msgs = [m for m in messages if m.get("role") == "user"] @@ -2135,7 +2239,7 @@ async def _inject_augmentation( return messages # --- 1. Retrieve relevant skills (for template customization, not injection) - skills = self.skill_manager.retrieve_relevant( + skills = sm.retrieve_relevant( task_desc, top_k=min(self.config.skill_top_k, 5), ) @@ -2152,7 +2256,7 @@ async def _inject_augmentation( ) if not memories: - return self._inject_skills(messages) + return self._inject_skills(messages, skill_manager=sm) # Dedup procedural memories that overlap with matched skills. memories = self._dedup_memory_against_skills( @@ -2161,7 +2265,7 @@ async def _inject_augmentation( memory_text = self.memory_manager.render_for_prompt(memories) if not memory_text: - return self._inject_skills(messages) + return self._inject_skills(messages, skill_manager=sm) # --- 3. Build skill-aware structured template (no full skill injection) --- # Extract compact process steps from matched skills. diff --git a/metaclaw/config.py b/metaclaw/config.py index 63516af5..47a428ac 100644 --- a/metaclaw/config.py +++ b/metaclaw/config.py @@ -167,6 +167,20 @@ class MetaClawConfig: bedrock_region: str = "us-east-1" skill_evolution_history_path: str = "memory_data/skills/evolution_history.jsonl" + # ------------------------------------------------------------------ # + # Per-Agent Isolation # + # ------------------------------------------------------------------ # + # Map agent_id → per-agent overrides. Keys: mode, skills_dir, lora_output. + # Missing agents fall back to top-level config. + # Example YAML: + # agents: + # doctor: + # mode: rl + # lora_output: ~/.metaclaw/lora/doctor/ + # main: + # mode: skills_only + agents: dict = field(default_factory=dict) + # ------------------------------------------------------------------ # # WeChat (official openclaw-weixin plugin, auto-installed) # # ------------------------------------------------------------------ #