From 5fa674b91ca65eef984f892c546cffa034ea3af6 Mon Sep 17 00:00:00 2001 From: "alexzhang2014@live.com" Date: Wed, 18 Mar 2026 02:35:47 +0800 Subject: [PATCH] feat(worker): add verify stage and harness closed-loop pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a new "verify" agent role for build/lint/type-check validation, with automatic failure redirect context passing (verify/test → code) so the coding agent can self-correct. Includes composite evaluator scoring from LLM confidence + external signals, project-level verify_commands config, tech_stack auto-detection, and a built-in "harness_pipeline" template (Code→Verify→Test with up to 3 retry loops). Co-Authored-By: Claude Opus 4.6 (1M context) --- platform/app/config.py | 2 + platform/app/models/project.py | 1 + platform/app/schemas/project.py | 3 + platform/app/schemas/template.py | 2 + platform/app/services/template_service.py | 15 ++ platform/app/worker/agents.py | 3 + platform/app/worker/engine.py | 93 ++++++++- platform/app/worker/evaluator.py | 127 ++++++++++++ platform/app/worker/executor.py | 4 + platform/app/worker/prompts.py | 33 ++++ platform/tests/test_evaluator.py | 144 ++++++++++++++ platform/tests/test_harness_loop.py | 229 ++++++++++++++++++++++ platform/tests/test_verify_stage.py | 176 +++++++++++++++++ platform/tests/test_worker_graph.py | 31 +++ 14 files changed, 861 insertions(+), 2 deletions(-) create mode 100644 platform/app/worker/evaluator.py create mode 100644 platform/tests/test_evaluator.py create mode 100644 platform/tests/test_harness_loop.py create mode 100644 platform/tests/test_verify_stage.py diff --git a/platform/app/config.py b/platform/app/config.py index 8e0a8f1..3b4cf99 100644 --- a/platform/app/config.py +++ b/platform/app/config.py @@ -112,6 +112,8 @@ class Settings(BaseSettings): CONDITIONS_ENABLED: bool = True EVALUATOR_DEFAULT_MIN_CONFIDENCE: float = 0.7 EVALUATOR_MAX_ITERATIONS: int = 3 + EVALUATOR_USE_EXTERNAL_SIGNALS: bool = True + EVALUATOR_COMPOSITE_WEIGHTS: str = '{"test_pass_rate": 0.4, "build_success": 0.2, "lint_clean": 0.1, "llm_confidence": 0.3}' DYNAMIC_GATE_ENABLED: bool = False DYNAMIC_GATE_CONFIDENCE_THRESHOLD: float = 0.5 STAGE_DEFAULT_MAX_RETRIES: int = 3 diff --git a/platform/app/models/project.py b/platform/app/models/project.py index 9f8738e..34c59dc 100644 --- a/platform/app/models/project.py +++ b/platform/app/models/project.py @@ -26,6 +26,7 @@ class ProjectModel(Base): tech_stack: Mapped[Optional[list]] = mapped_column(JSON, nullable=True) repo_tree: Mapped[Optional[str]] = mapped_column(Text, nullable=True) sandbox_image: Mapped[Optional[str]] = mapped_column(String(200), nullable=True) + verify_commands: Mapped[Optional[str]] = mapped_column(Text, nullable=True, default=None) last_synced_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, server_default=func.now() diff --git a/platform/app/schemas/project.py b/platform/app/schemas/project.py index f003a94..3c273a9 100644 --- a/platform/app/schemas/project.py +++ b/platform/app/schemas/project.py @@ -14,6 +14,7 @@ class ProjectCreateRequest(BaseModel): branch: str = "main" description: Optional[str] = None sandbox_image: Optional[str] = None + verify_commands: Optional[List[str]] = None class ProjectUpdateRequest(BaseModel): @@ -24,6 +25,7 @@ class ProjectUpdateRequest(BaseModel): description: Optional[str] = None status: Optional[str] = None sandbox_image: Optional[str] = None + verify_commands: Optional[List[str]] = None class ProjectResponse(BaseModel): @@ -38,6 +40,7 @@ class ProjectResponse(BaseModel): tech_stack: Optional[List[str]] = None repo_tree: Optional[str] = None sandbox_image: Optional[str] = None + verify_commands: Optional[List[str]] = None last_synced_at: Optional[datetime] = None created_at: datetime updated_at: datetime diff --git a/platform/app/schemas/template.py b/platform/app/schemas/template.py index 2178a37..7f9173e 100644 --- a/platform/app/schemas/template.py +++ b/platform/app/schemas/template.py @@ -27,6 +27,8 @@ class StageDefinition(BaseModel): depends_on: Optional[List[str]] = None on_failure: Optional[str] = None max_executions: int = 1 + # Harness: verify commands for verify stages + verify_commands: Optional[List[str]] = None # Phase 3.3: Dynamic routing routing: Optional[dict] = None diff --git a/platform/app/services/template_service.py b/platform/app/services/template_service.py index 06948d7..21bf0ae 100644 --- a/platform/app/services/template_service.py +++ b/platform/app/services/template_service.py @@ -78,6 +78,21 @@ {"after_stage": "review", "type": "human_approve"}, ], }, + { + "name": "harness_pipeline", + "display_name": "闭环流水线", + "description": "Code→Verify→Test 自动闭环流水线,验证/测试失败时自动回退重试,最多3轮", + "stages": [ + {"name": "parse", "agent_role": "orchestrator", "order": 0, "depends_on": []}, + {"name": "spec", "agent_role": "spec", "order": 1, "depends_on": ["parse"]}, + {"name": "code", "agent_role": "coding", "order": 2, "depends_on": ["spec"], "max_executions": 3}, + {"name": "verify", "agent_role": "verify", "order": 3, "depends_on": ["code"], "on_failure": "code", "max_executions": 3}, + {"name": "test", "agent_role": "test", "order": 4, "depends_on": ["verify"], "on_failure": "code", "max_executions": 3}, + {"name": "review", "agent_role": "review", "order": 5, "depends_on": ["test"]}, + {"name": "signoff", "agent_role": "orchestrator", "order": 6, "depends_on": ["review"]}, + ], + "gates": [], + }, { "name": "custom", "display_name": "自定义", diff --git a/platform/app/worker/agents.py b/platform/app/worker/agents.py index 8ec75e7..7abcb48 100644 --- a/platform/app/worker/agents.py +++ b/platform/app/worker/agents.py @@ -33,6 +33,7 @@ "coding": 8, "doc": 5, "test": 8, + "verify": 3, } _DEFAULT_MAX_TURNS = 5 @@ -45,6 +46,7 @@ "review": {"read", "execute", "skill"}, "smoke": {"read", "execute", "skill"}, "doc": {"read", "write", "edit", "skill"}, + "verify": {"execute", "read"}, } _ALL_TOOLS: set[str] = set() _TOOL_ARGUMENT_HINTS: dict[str, str] = {} @@ -62,6 +64,7 @@ "review": ["shared", "review"], "smoke": ["shared", "smoke"], "doc": ["shared", "doc"], + "verify": ["shared"], } diff --git a/platform/app/worker/engine.py b/platform/app/worker/engine.py index b322854..c128924 100644 --- a/platform/app/worker/engine.py +++ b/platform/app/worker/engine.py @@ -1191,8 +1191,9 @@ async def _process_task(session: AsyncSession, task: TaskModel) -> None: project_memory_store = ProjectMemoryStore(str(task.project_id)) except Exception: logger.warning("Failed to init memory store for project %s", task.project_id, exc_info=True) - # Phase 3.1: Graph-based execution when enabled - if settings.GRAPH_EXECUTION_ENABLED and task.template: + # Phase 3.1: Graph-based execution when enabled or when template uses graph features + use_graph = settings.GRAPH_EXECUTION_ENABLED or _template_needs_graph(task) + if use_graph and task.template: await _process_task_graph( session, task, sorted_stages, stage_defs, gates, prior_outputs, compression, structured_outputs, @@ -1440,6 +1441,70 @@ async def _process_task(session: AsyncSession, task: TaskModel) -> None: ) +def _template_needs_graph(task: TaskModel) -> bool: + """Check if a task's template uses graph features (depends_on / on_failure).""" + if not task.template: + return False + try: + stages_raw = task.template.stages + if isinstance(stages_raw, str): + stages_raw = json.loads(stages_raw) + if not isinstance(stages_raw, list): + return False + for s in stages_raw: + if not isinstance(s, dict): + continue + if s.get("depends_on") or s.get("on_failure"): + return True + except Exception: + pass + return False + + +def _resolve_verify_commands( + task: TaskModel, + stage_defs: Dict[str, dict], +) -> Optional[List[str]]: + """Resolve verify commands by priority: stage def > project > tech_stack auto-detect.""" + # 1. Stage-level verify_commands + verify_def = stage_defs.get("verify", {}) + stage_cmds = verify_def.get("verify_commands") + if stage_cmds: + return stage_cmds + + # 2. Project-level verify_commands + if task.project: + raw = getattr(task.project, "verify_commands", None) + if raw: + try: + cmds = json.loads(raw) if isinstance(raw, str) else raw + if isinstance(cmds, list) and cmds: + return cmds + except (json.JSONDecodeError, TypeError): + pass + + # 3. Auto-detect from tech_stack + if task.project: + tech_stack = getattr(task.project, "tech_stack", None) or [] + if isinstance(tech_stack, str): + try: + tech_stack = json.loads(tech_stack) + except (json.JSONDecodeError, TypeError): + tech_stack = [] + tech_lower = [str(t).lower() for t in tech_stack] + cmds = [] + if any(t in tech_lower for t in ("typescript", "nextjs", "react", "vue", "angular")): + cmds.append("npx tsc --noEmit") + cmds.append("npm run lint --if-present") + if any(t in tech_lower for t in ("python", "fastapi", "django", "flask")): + cmds.append("ruff check .") + cmds.append("python -m py_compile *.py") + if cmds: + return cmds + + return None + + async def _process_task_graph( session: AsyncSession, task: TaskModel, @@ -1515,6 +1580,9 @@ async def _process_task_graph( skipped.add(stage.stage_name) execution_counts[stage.stage_name] = stage.execution_count + # Harness: failure redirect context channel — pass error info to redirect target + _pending_redirect_contexts: Dict[str, Dict[str, str]] = {} + max_iterations = settings.GRAPH_MAX_LOOP_ITERATIONS * len(graph.nodes) iteration = 0 @@ -1564,11 +1632,15 @@ async def _process_task_graph( stage.execution_count = execution_counts[node.name] await session.commit() + # Consume pending redirect context for this stage + redirect_ctx = _pending_redirect_contexts.pop(node.name, None) + result = await _execute_single_stage( session, task, stage, stage_index, prior_outputs, compression, project_memory_store, repo_context, stage_defs, workspace_path, sandbox_info, sandbox_required_error=sandbox_required_error, + failure_redirect_context=redirect_ctx, ) if result is None: failed.add(node.name) @@ -1578,6 +1650,12 @@ async def _process_task_graph( logger.info( "Stage %s failed, redirecting to %s", node.name, redirect, ) + # Capture failure context before resetting + _pending_redirect_contexts[redirect] = { + "failed_stage": node.name, + "error": stage.error_message or "unknown error", + "output": (stage.output_summary or "")[:2000], + } # Reset the redirect target for re-execution redirect_stage = stage_map[redirect] redirect_stage.status = "pending" @@ -1682,6 +1760,7 @@ async def _execute_single_stage( sandbox_info=None, gate_rejection_context: Optional[Dict[str, str]] = None, sandbox_required_error: Optional[str] = None, + failure_redirect_context: Optional[Dict[str, str]] = None, ) -> Optional[str]: """Execute a single stage with model routing and retry context. @@ -1694,6 +1773,14 @@ async def _execute_single_stage( stage_timeout = sdef.get("timeout") # Phase 1.4 evaluator_config = sdef.get("evaluator") # Phase 2.2 + # Harness: auto-inject verify commands as custom_instruction for verify stages + if stage.agent_role == "verify" and not custom_instruction: + verify_cmds = _resolve_verify_commands(task, stage_defs) + if verify_cmds: + custom_instruction = "请依次执行以下验证命令:\n" + "\n".join( + f"- `{cmd}`" for cmd in verify_cmds + ) + # Build project memory for the current role project_memory: Optional[str] = None if project_memory_store: @@ -1820,6 +1907,7 @@ async def _execute_single_stage( stage_model=stage_model, custom_instruction=custom_instruction, gate_rejection_context=gate_rejection_context, + failure_redirect_context=failure_redirect_context, ) else: output = await execute_stage( @@ -1834,6 +1922,7 @@ async def _execute_single_stage( gate_rejection_context=gate_rejection_context, stage_timeout=stage_timeout, evaluator_config=evaluator_config, + failure_redirect_context=failure_redirect_context, ) return output except Exception as e: diff --git a/platform/app/worker/evaluator.py b/platform/app/worker/evaluator.py new file mode 100644 index 0000000..262e764 --- /dev/null +++ b/platform/app/worker/evaluator.py @@ -0,0 +1,127 @@ +"""External evaluator: composite scoring from LLM confidence + external signals.""" +from __future__ import annotations + +import json +import logging +from dataclasses import dataclass, field +from typing import Any, Dict, Optional + +from app.config import settings + +logger = logging.getLogger(__name__) + +DEFAULT_WEIGHTS: Dict[str, float] = { + "llm_confidence": 0.3, + "test_pass_rate": 0.4, + "build_success": 0.2, + "lint_clean": 0.1, +} + + +@dataclass +class EvaluationResult: + composite_score: float # 0.0 - 1.0 + llm_confidence: float + external_signals: Dict[str, Any] = field(default_factory=dict) + passed: bool = False + details: str = "" + + +def _load_weights() -> Dict[str, float]: + """Load composite weights from config, falling back to defaults.""" + try: + parsed = json.loads(settings.EVALUATOR_COMPOSITE_WEIGHTS) + if isinstance(parsed, dict): + return parsed + except (json.JSONDecodeError, TypeError): + pass + return dict(DEFAULT_WEIGHTS) + + +def compute_composite_score( + llm_confidence: float, + external_signals: Dict[str, Any], + weights: Optional[Dict[str, float]] = None, +) -> float: + """Compute weighted composite score from LLM confidence and external signals. + + Args: + llm_confidence: LLM self-assessed confidence (0.0 - 1.0). + external_signals: Dict with optional keys: test_pass_rate (float 0-1), + build_success (bool), lint_clean (bool). + weights: Optional custom weights. If None, uses config/defaults. + + Returns: + Composite score between 0.0 and 1.0. + """ + if weights is None: + weights = _load_weights() + + # If no external signals, fall back to pure LLM confidence + if not external_signals: + return max(0.0, min(1.0, llm_confidence)) + + score = 0.0 + total_weight = 0.0 + + # LLM confidence + w = weights.get("llm_confidence", 0.3) + score += w * max(0.0, min(1.0, llm_confidence)) + total_weight += w + + # Test pass rate (float 0-1) + if "test_pass_rate" in external_signals: + w = weights.get("test_pass_rate", 0.4) + rate = float(external_signals["test_pass_rate"]) + score += w * max(0.0, min(1.0, rate)) + total_weight += w + + # Build success (bool → 1.0 or 0.0) + if "build_success" in external_signals: + w = weights.get("build_success", 0.2) + score += w * (1.0 if external_signals["build_success"] else 0.0) + total_weight += w + + # Lint clean (bool → 1.0 or 0.0) + if "lint_clean" in external_signals: + w = weights.get("lint_clean", 0.1) + score += w * (1.0 if external_signals["lint_clean"] else 0.0) + total_weight += w + + if total_weight <= 0: + return max(0.0, min(1.0, llm_confidence)) + + return max(0.0, min(1.0, score / total_weight * (sum(weights.values()) / total_weight) + if total_weight != sum(weights.values()) else score)) + + +def extract_signals_from_stage_outputs( + structured_outputs: Dict[str, dict], +) -> Dict[str, Any]: + """Extract external signals from verify/test stage structured outputs. + + Looks for known keys in stage output_structured fields: + - verify stage: build_success (bool), lint_clean (bool) + - test stage: test_pass_rate (float), tests_passed (int), tests_total (int) + """ + signals: Dict[str, Any] = {} + + # Extract from verify stage + verify_out = structured_outputs.get("verify", {}) + if isinstance(verify_out, dict): + if "build_success" in verify_out: + signals["build_success"] = bool(verify_out["build_success"]) + if "lint_clean" in verify_out: + signals["lint_clean"] = bool(verify_out["lint_clean"]) + + # Extract from test stage + test_out = structured_outputs.get("test", {}) + if isinstance(test_out, dict): + if "test_pass_rate" in test_out: + signals["test_pass_rate"] = float(test_out["test_pass_rate"]) + elif "tests_passed" in test_out and "tests_total" in test_out: + total = int(test_out["tests_total"]) + if total > 0: + signals["test_pass_rate"] = int(test_out["tests_passed"]) / total + + return signals diff --git a/platform/app/worker/executor.py b/platform/app/worker/executor.py index 60ba1ce..f31da88 100644 --- a/platform/app/worker/executor.py +++ b/platform/app/worker/executor.py @@ -803,6 +803,7 @@ async def execute_stage( gate_rejection_context: Optional[Dict[str, str]] = None, stage_timeout: Optional[float] = None, evaluator_config: Optional[dict] = None, + failure_redirect_context: Optional[Dict[str, str]] = None, ) -> str: """Execute a single stage: call AgentRunner and update DB/broadcast.""" now = datetime.now(timezone.utc) @@ -853,6 +854,7 @@ async def execute_stage( retry_context=retry_context, custom_instruction=custom_instruction, gate_rejection_context=gate_rejection_context, + failure_redirect_context=failure_redirect_context, ) user_prompt = build_user_prompt(ctx) @@ -1198,6 +1200,7 @@ async def execute_stage_sandboxed( stage_model: Optional[str] = None, custom_instruction: Optional[str] = None, gate_rejection_context: Optional[Dict[str, str]] = None, + failure_redirect_context: Optional[Dict[str, str]] = None, ) -> str: """Execute a stage inside a sandbox container via HTTP. @@ -1253,6 +1256,7 @@ async def execute_stage_sandboxed( retry_context=retry_context, custom_instruction=custom_instruction, gate_rejection_context=gate_rejection_context, + failure_redirect_context=failure_redirect_context, ) user_prompt = build_user_prompt(ctx) system_prompt = SYSTEM_PROMPTS.get(stage.agent_role, SYSTEM_PROMPTS["orchestrator"]) diff --git a/platform/app/worker/prompts.py b/platform/app/worker/prompts.py index 597b8b4..bac5376 100644 --- a/platform/app/worker/prompts.py +++ b/platform/app/worker/prompts.py @@ -45,6 +45,11 @@ "你需要生成:API文档、使用说明、变更日志和架构说明。" "文档应清晰、准确、易于理解,面向开发者和使用者。" ), + "verify": ( + "你是一个构建验证Agent,负责执行编译、Lint和类型检查等验证命令。" + "你只需要运行指定的验证命令并如实报告结果,不要修改任何文件。" + "如果所有命令都通过,报告成功;如果有失败,详细列出失败的命令和错误信息。" + ), } # --------------------------------------------------------------------------- @@ -124,10 +129,21 @@ "4. 遗留问题清单(如有)\n" "5. 最终签收结论" ), + "verify": ( + "请执行附加指令中的验证命令,逐一报告每条命令的结果:\n" + "1. 依次执行每条验证命令\n" + "2. 记录每条命令的退出码和输出\n" + "3. 汇总:全部通过 / 部分失败\n" + "4. 对失败的命令,列出完整错误信息" + ), } STAGE_GUARDRAILS: Dict[str, str] = { + "verify": ( + "只运行验证命令,不要修改任何文件。\n" + "不要尝试修复发现的问题,只如实报告验证结果。" + ), "code": ( "只完成当前阶段,不要提前执行后续阶段任务。\n" "你可以为了验证实现而运行必要命令,但不要提前生成最终签收/验收报告," @@ -169,6 +185,8 @@ class StageContext: custom_instruction: Optional[str] = None # Phase 1.3: Gate rejection feedback context gate_rejection_context: Optional[Dict[str, str]] = None # {"comment": ..., "retry": "2/3"} + # Harness: failure redirect context from downstream stage (verify/test → code) + failure_redirect_context: Optional[Dict[str, str]] = None # {"failed_stage": ..., "error": ..., "output": ...} def build_user_prompt(ctx: StageContext) -> str: @@ -221,6 +239,21 @@ def build_user_prompt(ctx: StageContext) -> str: parts.append(f"**上次部分输出:**\n{truncated}") parts.append("请分析失败原因,避免重复同样的错误,重新完成任务。") + # Inject failure redirect context (from downstream verify/test failure) + if ctx.failure_redirect_context: + failed_stage = ctx.failure_redirect_context.get("failed_stage", "") + redirect_error = ctx.failure_redirect_context.get("error", "") + redirect_output = ctx.failure_redirect_context.get("output", "") + parts.append(f"\n## ⚠ 后续阶段失败反馈(来自 {failed_stage} 阶段)") + if redirect_error: + parts.append(f"**失败原因:** {redirect_error}") + if redirect_output: + truncated_output = redirect_output[:2000] + if len(redirect_output) > 2000: + truncated_output += "\n...(已截断)" + parts.append(f"**失败阶段输出:** {truncated_output}") + parts.append("请分析上述失败原因,修改你的产出以解决这些问题。") + # Inject gate rejection feedback if this is a retry after gate rejection if ctx.gate_rejection_context: comment = ctx.gate_rejection_context.get("comment", "") diff --git a/platform/tests/test_evaluator.py b/platform/tests/test_evaluator.py new file mode 100644 index 0000000..8c5ed55 --- /dev/null +++ b/platform/tests/test_evaluator.py @@ -0,0 +1,144 @@ +"""Tests for Phase 1.3: External Evaluator — composite scoring.""" +from __future__ import annotations + +from app.worker.evaluator import ( + EvaluationResult, + compute_composite_score, + extract_signals_from_stage_outputs, +) + + +def test_compute_composite_score_all_pass(): + score = compute_composite_score( + llm_confidence=0.9, + external_signals={ + "test_pass_rate": 1.0, + "build_success": True, + "lint_clean": True, + }, + ) + assert score > 0.85 + + +def test_compute_composite_score_build_failed(): + score = compute_composite_score( + llm_confidence=0.9, + external_signals={ + "test_pass_rate": 1.0, + "build_success": False, + "lint_clean": True, + }, + ) + # build_success=False with weight 0.2 should pull score down significantly + assert score < 0.9 + + +def test_compute_composite_score_test_partial(): + score = compute_composite_score( + llm_confidence=0.8, + external_signals={ + "test_pass_rate": 0.5, + "build_success": True, + "lint_clean": True, + }, + ) + # 0.3*0.8 + 0.4*0.5 + 0.2*1.0 + 0.1*1.0 = 0.24 + 0.20 + 0.20 + 0.10 = 0.74 + assert 0.70 <= score <= 0.78 + + +def test_compute_composite_score_no_external_signals(): + score = compute_composite_score( + llm_confidence=0.75, + external_signals={}, + ) + # Should fall back to pure LLM confidence + assert score == 0.75 + + +def test_compute_composite_score_custom_weights(): + custom_weights = { + "llm_confidence": 0.5, + "test_pass_rate": 0.5, + } + score = compute_composite_score( + llm_confidence=1.0, + external_signals={"test_pass_rate": 0.0}, + weights=custom_weights, + ) + # 0.5*1.0 + 0.5*0.0 = 0.5 + assert abs(score - 0.5) < 0.01 + + +def test_compute_composite_score_clamps_to_range(): + score = compute_composite_score( + llm_confidence=1.5, # over 1.0 + external_signals={}, + ) + assert score == 1.0 + + score = compute_composite_score( + llm_confidence=-0.5, # under 0.0 + external_signals={}, + ) + assert score == 0.0 + + +def test_extract_signals_from_verify_output(): + structured_outputs = { + "verify": { + "build_success": True, + "lint_clean": False, + }, + } + signals = extract_signals_from_stage_outputs(structured_outputs) + assert signals["build_success"] is True + assert signals["lint_clean"] is False + + +def test_extract_signals_from_test_output(): + structured_outputs = { + "test": { + "tests_passed": 8, + "tests_total": 10, + }, + } + signals = extract_signals_from_stage_outputs(structured_outputs) + assert abs(signals["test_pass_rate"] - 0.8) < 0.01 + + +def test_extract_signals_from_test_output_with_rate(): + structured_outputs = { + "test": { + "test_pass_rate": 0.95, + }, + } + signals = extract_signals_from_stage_outputs(structured_outputs) + assert signals["test_pass_rate"] == 0.95 + + +def test_extract_signals_empty_outputs(): + signals = extract_signals_from_stage_outputs({}) + assert signals == {} + + +def test_extract_signals_mixed_outputs(): + structured_outputs = { + "verify": {"build_success": True, "lint_clean": True}, + "test": {"test_pass_rate": 1.0}, + } + signals = extract_signals_from_stage_outputs(structured_outputs) + assert signals["build_success"] is True + assert signals["lint_clean"] is True + assert signals["test_pass_rate"] == 1.0 + + +def test_evaluation_result_dataclass(): + result = EvaluationResult( + composite_score=0.85, + llm_confidence=0.9, + external_signals={"test_pass_rate": 0.8}, + passed=True, + details="All checks passed", + ) + assert result.composite_score == 0.85 + assert result.passed is True diff --git a/platform/tests/test_harness_loop.py b/platform/tests/test_harness_loop.py new file mode 100644 index 0000000..bcbc983 --- /dev/null +++ b/platform/tests/test_harness_loop.py @@ -0,0 +1,229 @@ +"""Tests for Phase 1.2: Test→Code feedback loop — failure redirect context passing.""" +from __future__ import annotations + +from types import SimpleNamespace + +import pytest + +from app.worker.prompts import StageContext, build_user_prompt + + +# ── build_user_prompt with failure_redirect_context ───────────── + + +def _make_ctx(**overrides) -> StageContext: + defaults = dict( + task_title="Test Task", + task_description="Test desc", + stage_name="code", + agent_role="coding", + prior_outputs=[], + ) + defaults.update(overrides) + return StageContext(**defaults) + + +def test_build_user_prompt_with_failure_redirect_context(): + ctx = _make_ctx( + failure_redirect_context={ + "failed_stage": "verify", + "error": "build failed: tsc error", + "output": "error TS2304: Cannot find name 'foo'", + }, + ) + prompt = build_user_prompt(ctx) + assert "后续阶段失败反馈" in prompt + assert "verify" in prompt + assert "build failed: tsc error" in prompt + assert "Cannot find name 'foo'" in prompt + assert "请分析上述失败原因" in prompt + + +def test_build_user_prompt_failure_redirect_truncates_long_output(): + long_output = "x" * 3000 + ctx = _make_ctx( + failure_redirect_context={ + "failed_stage": "test", + "error": "tests failed", + "output": long_output, + }, + ) + prompt = build_user_prompt(ctx) + assert "已截断" in prompt + # The truncated output should be at most 2000 chars + truncation marker + assert long_output[:2000] in prompt + assert long_output[:2001] not in prompt + + +def test_build_user_prompt_failure_redirect_none_ignored(): + ctx = _make_ctx(failure_redirect_context=None) + prompt = build_user_prompt(ctx) + assert "后续阶段失败反馈" not in prompt + + +def test_build_user_prompt_failure_redirect_empty_fields(): + ctx = _make_ctx( + failure_redirect_context={ + "failed_stage": "verify", + "error": "", + "output": "", + }, + ) + prompt = build_user_prompt(ctx) + assert "后续阶段失败反馈" in prompt + assert "verify" in prompt + # Empty error/output should not produce those sub-sections + assert "失败原因:" not in prompt + assert "失败阶段输出:" not in prompt + + +# ── Graph failure redirect context capture ────────────────────── + + +@pytest.mark.asyncio +async def test_graph_failure_redirect_captures_context(monkeypatch): + """Verify that _pending_redirect_contexts is populated when a stage fails.""" + # We'll directly test the redirect context logic by examining the + # _process_task_graph internals. To avoid running the full loop, + # we test the redirect context dict construction pattern. + redirect_contexts = {} + failed_stage_name = "verify" + redirect_target = "code" + error_msg = "Build failed" + output_summary = "tsc error output" + + # This mirrors the logic in _process_task_graph failure redirect block + redirect_contexts[redirect_target] = { + "failed_stage": failed_stage_name, + "error": error_msg, + "output": output_summary[:2000], + } + + assert redirect_target in redirect_contexts + ctx = redirect_contexts[redirect_target] + assert ctx["failed_stage"] == "verify" + assert ctx["error"] == "Build failed" + assert ctx["output"] == "tsc error output" + + +@pytest.mark.asyncio +async def test_graph_failure_redirect_passes_context_to_execute(monkeypatch): + """Verify _execute_single_stage receives failure_redirect_context.""" + from app.worker import engine + + captured_kwargs = {} + + async def mock_execute(*args, **kwargs): + captured_kwargs.update(kwargs) + return "output" + + monkeypatch.setattr(engine, "_execute_single_stage", mock_execute) + + # Simulate what the graph loop does: pop context and pass to execute + pending = {"code": {"failed_stage": "verify", "error": "err", "output": "out"}} + redirect_ctx = pending.pop("code", None) + + await engine._execute_single_stage( + None, None, None, 0, [], None, None, None, {}, + failure_redirect_context=redirect_ctx, + ) + + assert captured_kwargs["failure_redirect_context"] is not None + assert captured_kwargs["failure_redirect_context"]["failed_stage"] == "verify" + + +def test_graph_verify_fails_code_receives_error(): + """Unit test: verify failure → code gets verify's error context.""" + redirect_contexts = {} + + # Simulate verify failure + verify_error = "npm run build failed with exit code 1" + verify_output = "Error: module not found" + redirect_contexts["code"] = { + "failed_stage": "verify", + "error": verify_error, + "output": verify_output, + } + + # Simulate code receiving context + ctx = _make_ctx(failure_redirect_context=redirect_contexts.get("code")) + prompt = build_user_prompt(ctx) + assert "verify" in prompt + assert verify_error in prompt + assert verify_output in prompt + + +def test_graph_test_fails_code_receives_error(): + """Unit test: test failure → code gets test's error context.""" + ctx = _make_ctx( + failure_redirect_context={ + "failed_stage": "test", + "error": "3 tests failed", + "output": "FAILED test_login - AssertionError", + }, + ) + prompt = build_user_prompt(ctx) + assert "test" in prompt + assert "3 tests failed" in prompt + + +def test_graph_multi_loop_preserves_latest_error(): + """Second failure overwrites first redirect context for same target.""" + pending = {} + + # First failure from verify + pending["code"] = { + "failed_stage": "verify", + "error": "first error", + "output": "first output", + } + + # Second failure from test (overwrites) + pending["code"] = { + "failed_stage": "test", + "error": "second error", + "output": "second output", + } + + assert pending["code"]["failed_stage"] == "test" + assert pending["code"]["error"] == "second error" + + +def test_graph_redirect_resets_target_stage_status(): + """Redirect target's status should be reset to pending.""" + redirect_stage = SimpleNamespace( + status="completed", + error_message="old error", + output_summary="old output", + ) + + # Simulate the reset logic from _process_task_graph + redirect_stage.status = "pending" + redirect_stage.error_message = None + redirect_stage.output_summary = None + + assert redirect_stage.status == "pending" + assert redirect_stage.error_message is None + assert redirect_stage.output_summary is None + + +def test_graph_max_executions_stops_loop(): + """Once max_executions is reached, stage should not be re-executed.""" + from app.worker.graph import StageGraph + + stages = [ + {"name": "code", "order": 1, "max_executions": 2}, + {"name": "verify", "order": 2, "on_failure": "code", "max_executions": 2}, + ] + graph = StageGraph.from_template_stages(stages) + + # code already executed 2 times and is in failed — should not be ready + ready = graph.get_ready_stages( + completed=set(), + running=set(), + failed={"code"}, + skipped=set(), + execution_counts={"code": 2, "verify": 0}, + ) + # code is at max, verify depends on code being completed — nothing ready + assert ready == [] diff --git a/platform/tests/test_verify_stage.py b/platform/tests/test_verify_stage.py new file mode 100644 index 0000000..140d5d4 --- /dev/null +++ b/platform/tests/test_verify_stage.py @@ -0,0 +1,176 @@ +"""Tests for Phase 1.1: Verify Stage — role, prompts, tools, engine helpers, template.""" +from __future__ import annotations + +import json +from types import SimpleNamespace + +from app.worker.prompts import STAGE_GUARDRAILS, STAGE_INSTRUCTIONS, SYSTEM_PROMPTS + + +# ── Role & prompts ────────────────────────────────────────────── + + +def test_verify_role_in_system_prompts(): + assert "verify" in SYSTEM_PROMPTS + assert "验证" in SYSTEM_PROMPTS["verify"] + + +def test_verify_role_tools(): + from app.worker.agents import ROLE_TOOLS + + assert "verify" in ROLE_TOOLS + assert ROLE_TOOLS["verify"] == {"execute", "read"} + + +def test_verify_stage_instruction_exists(): + assert "verify" in STAGE_INSTRUCTIONS + assert "验证命令" in STAGE_INSTRUCTIONS["verify"] + + +def test_verify_guardrail_exists(): + assert "verify" in STAGE_GUARDRAILS + assert "不要修改" in STAGE_GUARDRAILS["verify"] + + +# ── _resolve_verify_commands ──────────────────────────────────── + + +def _make_task(project=None, template=None) -> SimpleNamespace: + return SimpleNamespace( + id="t-1", title="T", description="D", + project_id="p-1", project=project, + template=template, stages=[], + target_branch=None, status="running", + ) + + +def test_resolve_verify_commands_from_stage_def(): + from app.worker.engine import _resolve_verify_commands + + task = _make_task() + stage_defs = {"verify": {"verify_commands": ["npm run build"]}} + assert _resolve_verify_commands(task, stage_defs) == ["npm run build"] + + +def test_resolve_verify_commands_from_project(): + from app.worker.engine import _resolve_verify_commands + + project = SimpleNamespace( + verify_commands=json.dumps(["ruff check ."]), + tech_stack=None, + ) + task = _make_task(project=project) + assert _resolve_verify_commands(task, {}) == ["ruff check ."] + + +def test_resolve_verify_commands_auto_detect_typescript(): + from app.worker.engine import _resolve_verify_commands + + project = SimpleNamespace( + verify_commands=None, + tech_stack=["typescript", "react"], + ) + task = _make_task(project=project) + cmds = _resolve_verify_commands(task, {}) + assert cmds is not None + assert any("tsc" in c for c in cmds) + + +def test_resolve_verify_commands_auto_detect_python(): + from app.worker.engine import _resolve_verify_commands + + project = SimpleNamespace( + verify_commands=None, + tech_stack=["python", "fastapi"], + ) + task = _make_task(project=project) + cmds = _resolve_verify_commands(task, {}) + assert cmds is not None + assert any("ruff" in c for c in cmds) + + +def test_resolve_verify_commands_returns_none_no_config(): + from app.worker.engine import _resolve_verify_commands + + task = _make_task() + assert _resolve_verify_commands(task, {}) is None + + +# ── _template_needs_graph ─────────────────────────────────────── + + +def _make_template(stages: list) -> SimpleNamespace: + return SimpleNamespace(stages=stages) + + +def test_template_needs_graph_with_depends_on(): + from app.worker.engine import _template_needs_graph + + tpl = _make_template([{"name": "a", "depends_on": ["b"]}]) + task = _make_task(template=tpl) + assert _template_needs_graph(task) is True + + +def test_template_needs_graph_with_on_failure(): + from app.worker.engine import _template_needs_graph + + tpl = _make_template([{"name": "a", "on_failure": "b"}]) + task = _make_task(template=tpl) + assert _template_needs_graph(task) is True + + +def test_template_needs_graph_linear_returns_false(): + from app.worker.engine import _template_needs_graph + + tpl = _make_template([{"name": "a", "order": 0}, {"name": "b", "order": 1}]) + task = _make_task(template=tpl) + assert _template_needs_graph(task) is False + + +def test_template_needs_graph_no_template(): + from app.worker.engine import _template_needs_graph + + task = _make_task(template=None) + assert _template_needs_graph(task) is False + + +# ── harness_pipeline template structure ───────────────────────── + + +def test_harness_pipeline_template_structure(): + from app.services.template_service import BUILTIN_TEMPLATES + + harness = None + for t in BUILTIN_TEMPLATES: + if t["name"] == "harness_pipeline": + harness = t + break + + assert harness is not None, "harness_pipeline template not found" + assert harness["display_name"] == "闭环流水线" + + stage_names = [s["name"] for s in harness["stages"]] + assert "parse" in stage_names + assert "spec" in stage_names + assert "code" in stage_names + assert "verify" in stage_names + assert "test" in stage_names + assert "review" in stage_names + assert "signoff" in stage_names + + # Verify → code on_failure + verify_stage = next(s for s in harness["stages"] if s["name"] == "verify") + assert verify_stage["on_failure"] == "code" + assert verify_stage["max_executions"] == 3 + + # Test → code on_failure + test_stage = next(s for s in harness["stages"] if s["name"] == "test") + assert test_stage["on_failure"] == "code" + + # Code stage has max_executions + code_stage = next(s for s in harness["stages"] if s["name"] == "code") + assert code_stage["max_executions"] == 3 + + # All stages have depends_on (graph mode) + for s in harness["stages"]: + assert "depends_on" in s, f"Stage {s['name']} missing depends_on" diff --git a/platform/tests/test_worker_graph.py b/platform/tests/test_worker_graph.py index 9cddffc..d51975d 100644 --- a/platform/tests/test_worker_graph.py +++ b/platform/tests/test_worker_graph.py @@ -108,3 +108,34 @@ def test_validate_reports_unknown_dependencies_and_cycles(): errors = graph.validate() assert any("unknown stage 'missing'" in e for e in errors) assert any("Cycle detected" in e for e in errors) + + +def test_harness_pipeline_graph_failure_redirect(): + """Verify harness_pipeline template builds a valid graph with redirect paths.""" + from app.services.template_service import BUILTIN_TEMPLATES + + harness = next(t for t in BUILTIN_TEMPLATES if t["name"] == "harness_pipeline") + graph = StageGraph.from_template_stages(harness["stages"]) + + # Graph should be valid + errors = graph.validate() + assert errors == [], f"Graph validation errors: {errors}" + + # verify → code redirect + assert graph.get_failure_redirect("verify") == "code" + # test → code redirect + assert graph.get_failure_redirect("test") == "code" + # code has no redirect + assert graph.get_failure_redirect("code") is None + + # After spec completes, code should be ready + ready = graph.get_ready_stages( + completed={"parse", "spec"}, running=set(), failed=set(), skipped=set(), + ) + assert any(n.name == "code" for n in ready) + + # After code completes, verify should be ready + ready = graph.get_ready_stages( + completed={"parse", "spec", "code"}, running=set(), failed=set(), skipped=set(), + ) + assert any(n.name == "verify" for n in ready)