Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions platform/app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ class Settings(BaseSettings):
CONDITIONS_ENABLED: bool = True
EVALUATOR_DEFAULT_MIN_CONFIDENCE: float = 0.7
EVALUATOR_MAX_ITERATIONS: int = 3
EVALUATOR_USE_EXTERNAL_SIGNALS: bool = True
EVALUATOR_COMPOSITE_WEIGHTS: str = '{"test_pass_rate": 0.4, "build_success": 0.2, "lint_clean": 0.1, "llm_confidence": 0.3}'
DYNAMIC_GATE_ENABLED: bool = False
DYNAMIC_GATE_CONFIDENCE_THRESHOLD: float = 0.5
STAGE_DEFAULT_MAX_RETRIES: int = 3
Expand Down
1 change: 1 addition & 0 deletions platform/app/models/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class ProjectModel(Base):
tech_stack: Mapped[Optional[list]] = mapped_column(JSON, nullable=True)
repo_tree: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
sandbox_image: Mapped[Optional[str]] = mapped_column(String(200), nullable=True)
verify_commands: Mapped[Optional[str]] = mapped_column(Text, nullable=True, default=None)
last_synced_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
Expand Down
3 changes: 3 additions & 0 deletions platform/app/schemas/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class ProjectCreateRequest(BaseModel):
branch: str = "main"
description: Optional[str] = None
sandbox_image: Optional[str] = None
verify_commands: Optional[List[str]] = None


class ProjectUpdateRequest(BaseModel):
Expand All @@ -24,6 +25,7 @@ class ProjectUpdateRequest(BaseModel):
description: Optional[str] = None
status: Optional[str] = None
sandbox_image: Optional[str] = None
verify_commands: Optional[List[str]] = None


class ProjectResponse(BaseModel):
Expand All @@ -38,6 +40,7 @@ class ProjectResponse(BaseModel):
tech_stack: Optional[List[str]] = None
repo_tree: Optional[str] = None
sandbox_image: Optional[str] = None
verify_commands: Optional[List[str]] = None
last_synced_at: Optional[datetime] = None
created_at: datetime
updated_at: datetime
Expand Down
2 changes: 2 additions & 0 deletions platform/app/schemas/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class StageDefinition(BaseModel):
depends_on: Optional[List[str]] = None
on_failure: Optional[str] = None
max_executions: int = 1
# Harness: verify commands for verify stages
verify_commands: Optional[List[str]] = None
# Phase 3.3: Dynamic routing
routing: Optional[dict] = None

Expand Down
15 changes: 15 additions & 0 deletions platform/app/services/template_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,21 @@
{"after_stage": "review", "type": "human_approve"},
],
},
{
"name": "harness_pipeline",
"display_name": "闭环流水线",
"description": "Code→Verify→Test 自动闭环流水线,验证/测试失败时自动回退重试,最多3轮",
"stages": [
{"name": "parse", "agent_role": "orchestrator", "order": 0, "depends_on": []},
{"name": "spec", "agent_role": "spec", "order": 1, "depends_on": ["parse"]},
{"name": "code", "agent_role": "coding", "order": 2, "depends_on": ["spec"], "max_executions": 3},
{"name": "verify", "agent_role": "verify", "order": 3, "depends_on": ["code"], "on_failure": "code", "max_executions": 3},
{"name": "test", "agent_role": "test", "order": 4, "depends_on": ["verify"], "on_failure": "code", "max_executions": 3},
{"name": "review", "agent_role": "review", "order": 5, "depends_on": ["test"]},
{"name": "signoff", "agent_role": "orchestrator", "order": 6, "depends_on": ["review"]},
],
"gates": [],
},
{
"name": "custom",
"display_name": "自定义",
Expand Down
3 changes: 3 additions & 0 deletions platform/app/worker/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"coding": 8,
"doc": 5,
"test": 8,
"verify": 3,
}
_DEFAULT_MAX_TURNS = 5

Expand All @@ -45,6 +46,7 @@
"review": {"read", "execute", "skill"},
"smoke": {"read", "execute", "skill"},
"doc": {"read", "write", "edit", "skill"},
"verify": {"execute", "read"},
}
_ALL_TOOLS: set[str] = set()
_TOOL_ARGUMENT_HINTS: dict[str, str] = {}
Expand All @@ -62,6 +64,7 @@
"review": ["shared", "review"],
"smoke": ["shared", "smoke"],
"doc": ["shared", "doc"],
"verify": ["shared"],
}


Expand Down
93 changes: 91 additions & 2 deletions platform/app/worker/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1188,11 +1188,12 @@
if settings.MEMORY_ENABLED and task.project_id:
try:
from app.worker.memory import ProjectMemoryStore
project_memory_store = ProjectMemoryStore(str(task.project_id))
except Exception:
logger.warning("Failed to init memory store for project %s", task.project_id, exc_info=True)

Check warning on line 1193 in platform/app/worker/engine.py

View workflow job for this annotation

GitHub Actions / backend-test

Missing coverage

Missing coverage on lines 1191-1193
# Phase 3.1: Graph-based execution when enabled
if settings.GRAPH_EXECUTION_ENABLED and task.template:
# Phase 3.1: Graph-based execution when enabled or when template uses graph features
use_graph = settings.GRAPH_EXECUTION_ENABLED or _template_needs_graph(task)
if use_graph and task.template:
await _process_task_graph(
session, task, sorted_stages, stage_defs, gates,
prior_outputs, compression, structured_outputs,
Expand Down Expand Up @@ -1440,6 +1441,70 @@
)


def _template_needs_graph(task: TaskModel) -> bool:
"""Check if a task's template uses graph features (depends_on / on_failure)."""
if not task.template:
return False
try:
stages_raw = task.template.stages
if isinstance(stages_raw, str):
stages_raw = json.loads(stages_raw)
if not isinstance(stages_raw, list):
return False

Check warning on line 1453 in platform/app/worker/engine.py

View workflow job for this annotation

GitHub Actions / backend-test

Missing coverage

Missing coverage on line 1453
for s in stages_raw:
if not isinstance(s, dict):
continue

Check warning on line 1456 in platform/app/worker/engine.py

View workflow job for this annotation

GitHub Actions / backend-test

Missing coverage

Missing coverage on line 1456
if s.get("depends_on") or s.get("on_failure"):
return True
except Exception:
pass

Check warning on line 1460 in platform/app/worker/engine.py

View workflow job for this annotation

GitHub Actions / backend-test

Missing coverage

Missing coverage on lines 1459-1460
return False


def _resolve_verify_commands(
task: TaskModel,
stage_defs: Dict[str, dict],
) -> Optional[List[str]]:
"""Resolve verify commands by priority: stage def > project > tech_stack auto-detect."""
# 1. Stage-level verify_commands
verify_def = stage_defs.get("verify", {})
stage_cmds = verify_def.get("verify_commands")
if stage_cmds:
return stage_cmds

# 2. Project-level verify_commands
if task.project:
raw = getattr(task.project, "verify_commands", None)
if raw:
try:
cmds = json.loads(raw) if isinstance(raw, str) else raw
if isinstance(cmds, list) and cmds:
return cmds
except (json.JSONDecodeError, TypeError):
pass

Check warning on line 1484 in platform/app/worker/engine.py

View workflow job for this annotation

GitHub Actions / backend-test

Missing coverage

Missing coverage on lines 1483-1484

# 3. Auto-detect from tech_stack
if task.project:
tech_stack = getattr(task.project, "tech_stack", None) or []
if isinstance(tech_stack, str):
try:
tech_stack = json.loads(tech_stack)
except (json.JSONDecodeError, TypeError):
tech_stack = []

Check warning on line 1493 in platform/app/worker/engine.py

View workflow job for this annotation

GitHub Actions / backend-test

Missing coverage

Missing coverage on lines 1490-1493
tech_lower = [str(t).lower() for t in tech_stack]
cmds = []
if any(t in tech_lower for t in ("typescript", "nextjs", "react", "vue", "angular")):
cmds.append("npx tsc --noEmit")
cmds.append("npm run lint --if-present")
if any(t in tech_lower for t in ("python", "fastapi", "django", "flask")):
cmds.append("ruff check .")
cmds.append("python -m py_compile *.py")
if cmds:
return cmds

return None


async def _process_task_graph(
session: AsyncSession,
task: TaskModel,
Expand Down Expand Up @@ -1515,6 +1580,9 @@
skipped.add(stage.stage_name)
execution_counts[stage.stage_name] = stage.execution_count

# Harness: failure redirect context channel — pass error info to redirect target
_pending_redirect_contexts: Dict[str, Dict[str, str]] = {}

max_iterations = settings.GRAPH_MAX_LOOP_ITERATIONS * len(graph.nodes)
iteration = 0

Expand Down Expand Up @@ -1564,11 +1632,15 @@
stage.execution_count = execution_counts[node.name]
await session.commit()

# Consume pending redirect context for this stage
redirect_ctx = _pending_redirect_contexts.pop(node.name, None)

result = await _execute_single_stage(
session, task, stage, stage_index,
prior_outputs, compression, project_memory_store,
repo_context, stage_defs, workspace_path, sandbox_info,
sandbox_required_error=sandbox_required_error,
failure_redirect_context=redirect_ctx,
)
if result is None:
failed.add(node.name)
Expand All @@ -1578,6 +1650,12 @@
logger.info(
"Stage %s failed, redirecting to %s", node.name, redirect,
)
# Capture failure context before resetting
_pending_redirect_contexts[redirect] = {
"failed_stage": node.name,
"error": stage.error_message or "unknown error",
"output": (stage.output_summary or "")[:2000],
}
# Reset the redirect target for re-execution
redirect_stage = stage_map[redirect]
redirect_stage.status = "pending"
Expand Down Expand Up @@ -1682,6 +1760,7 @@
sandbox_info=None,
gate_rejection_context: Optional[Dict[str, str]] = None,
sandbox_required_error: Optional[str] = None,
failure_redirect_context: Optional[Dict[str, str]] = None,
) -> Optional[str]:
"""Execute a single stage with model routing and retry context.

Expand All @@ -1694,6 +1773,14 @@
stage_timeout = sdef.get("timeout") # Phase 1.4
evaluator_config = sdef.get("evaluator") # Phase 2.2

# Harness: auto-inject verify commands as custom_instruction for verify stages
if stage.agent_role == "verify" and not custom_instruction:
verify_cmds = _resolve_verify_commands(task, stage_defs)
if verify_cmds:
custom_instruction = "请依次执行以下验证命令:\n" + "\n".join(

Check warning on line 1780 in platform/app/worker/engine.py

View workflow job for this annotation

GitHub Actions / backend-test

Missing coverage

Missing coverage on lines 1778-1780
f"- `{cmd}`" for cmd in verify_cmds
)

# Build project memory for the current role
project_memory: Optional[str] = None
if project_memory_store:
Expand Down Expand Up @@ -1820,6 +1907,7 @@
stage_model=stage_model,
custom_instruction=custom_instruction,
gate_rejection_context=gate_rejection_context,
failure_redirect_context=failure_redirect_context,
)
else:
output = await execute_stage(
Expand All @@ -1834,6 +1922,7 @@
gate_rejection_context=gate_rejection_context,
stage_timeout=stage_timeout,
evaluator_config=evaluator_config,
failure_redirect_context=failure_redirect_context,
)
return output
except Exception as e:
Expand Down
127 changes: 127 additions & 0 deletions platform/app/worker/evaluator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""External evaluator: composite scoring from LLM confidence + external signals."""
from __future__ import annotations

import json
import logging
from dataclasses import dataclass, field
from typing import Any, Dict, Optional

from app.config import settings

logger = logging.getLogger(__name__)

DEFAULT_WEIGHTS: Dict[str, float] = {
"llm_confidence": 0.3,
"test_pass_rate": 0.4,
"build_success": 0.2,
"lint_clean": 0.1,
}


@dataclass
class EvaluationResult:
composite_score: float # 0.0 - 1.0
llm_confidence: float
external_signals: Dict[str, Any] = field(default_factory=dict)
passed: bool = False
details: str = ""


def _load_weights() -> Dict[str, float]:
"""Load composite weights from config, falling back to defaults."""
try:
parsed = json.loads(settings.EVALUATOR_COMPOSITE_WEIGHTS)
if isinstance(parsed, dict):
return parsed
except (json.JSONDecodeError, TypeError):
pass
return dict(DEFAULT_WEIGHTS)

Check warning on line 38 in platform/app/worker/evaluator.py

View workflow job for this annotation

GitHub Actions / backend-test

Missing coverage

Missing coverage on lines 36-38


def compute_composite_score(
llm_confidence: float,
external_signals: Dict[str, Any],
weights: Optional[Dict[str, float]] = None,
) -> float:
"""Compute weighted composite score from LLM confidence and external signals.

Args:
llm_confidence: LLM self-assessed confidence (0.0 - 1.0).
external_signals: Dict with optional keys: test_pass_rate (float 0-1),
build_success (bool), lint_clean (bool).
weights: Optional custom weights. If None, uses config/defaults.

Returns:
Composite score between 0.0 and 1.0.
"""
if weights is None:
weights = _load_weights()

# If no external signals, fall back to pure LLM confidence
if not external_signals:
return max(0.0, min(1.0, llm_confidence))

score = 0.0
total_weight = 0.0

# LLM confidence
w = weights.get("llm_confidence", 0.3)
score += w * max(0.0, min(1.0, llm_confidence))
total_weight += w

# Test pass rate (float 0-1)
if "test_pass_rate" in external_signals:
w = weights.get("test_pass_rate", 0.4)
rate = float(external_signals["test_pass_rate"])
score += w * max(0.0, min(1.0, rate))
total_weight += w

# Build success (bool → 1.0 or 0.0)
if "build_success" in external_signals:
w = weights.get("build_success", 0.2)
score += w * (1.0 if external_signals["build_success"] else 0.0)
total_weight += w

# Lint clean (bool → 1.0 or 0.0)
if "lint_clean" in external_signals:
w = weights.get("lint_clean", 0.1)
score += w * (1.0 if external_signals["lint_clean"] else 0.0)
total_weight += w

if total_weight <= 0:
return max(0.0, min(1.0, llm_confidence))

Check warning on line 92 in platform/app/worker/evaluator.py

View workflow job for this annotation

GitHub Actions / backend-test

Missing coverage

Missing coverage on line 92

return max(0.0, min(1.0, score / total_weight * (sum(weights.values()) / total_weight)
if total_weight != sum(weights.values()) else score))


def extract_signals_from_stage_outputs(
structured_outputs: Dict[str, dict],
) -> Dict[str, Any]:
"""Extract external signals from verify/test stage structured outputs.

Looks for known keys in stage output_structured fields:
- verify stage: build_success (bool), lint_clean (bool)
- test stage: test_pass_rate (float), tests_passed (int), tests_total (int)
"""
signals: Dict[str, Any] = {}

# Extract from verify stage
verify_out = structured_outputs.get("verify", {})
if isinstance(verify_out, dict):
if "build_success" in verify_out:
signals["build_success"] = bool(verify_out["build_success"])
if "lint_clean" in verify_out:
signals["lint_clean"] = bool(verify_out["lint_clean"])

# Extract from test stage
test_out = structured_outputs.get("test", {})
if isinstance(test_out, dict):
if "test_pass_rate" in test_out:
signals["test_pass_rate"] = float(test_out["test_pass_rate"])
elif "tests_passed" in test_out and "tests_total" in test_out:
total = int(test_out["tests_total"])
if total > 0:
signals["test_pass_rate"] = int(test_out["tests_passed"]) / total

return signals
Loading
Loading