Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion apps/backend/prompts/spec_writer.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ You MUST create `spec.md` with ALL required sections (see template below).
## PHASE 0: LOAD ALL CONTEXT (MANDATORY)

```bash
# Read all input files
# Read all input files (some may not exist for greenfield/empty projects)
cat project_index.json
cat requirements.json
cat context.json
Expand All @@ -35,6 +35,12 @@ Extract from these files:
- **From requirements.json**: Task description, workflow type, services, acceptance criteria
- **From context.json**: Files to modify, files to reference, patterns

**IMPORTANT**: If any input file is missing, empty, or shows 0 files, this is likely a **greenfield/new project**. Adapt accordingly:
- Skip sections that reference existing code (e.g., "Files to Modify", "Patterns to Follow")
- Instead, focus on files to CREATE and the initial project structure
- Define the tech stack, dependencies, and setup instructions from scratch
- Use industry best practices as patterns rather than referencing existing code

---

## PHASE 1: ANALYZE CONTEXT
Expand Down
50 changes: 46 additions & 4 deletions apps/backend/spec/phases/spec_phases.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,54 @@
"""

import json
from typing import TYPE_CHECKING
from pathlib import Path

from .. import validator, writer
from ..discovery import get_project_index_stats
from .models import MAX_RETRIES, PhaseResult

if TYPE_CHECKING:
pass

def _is_greenfield_project(spec_dir: Path) -> bool:
"""Check if the project is empty/greenfield (0 discovered files)."""
stats = get_project_index_stats(spec_dir)
if not stats:
return False # Can't determine - don't assume greenfield
return stats.get("file_count", 0) == 0
Comment on lines +16 to +21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Good fix for the false-positive greenfield detection on missing/corrupt index.

The if not stats: return False guard correctly prevents misclassifying projects when get_project_index_stats returns {} due to a missing or unparsable index file.

One remaining edge case: if project_index.json exists and parses as valid JSON but uses an unrecognized format (no "files" or "services" key), get_project_index_stats returns {"file_count": 0, ...} — a non-empty dict that passes the guard and yields file_count == 0 → True. This would be a false positive on a valid-but-unrecognized index format. Consider additionally checking that "file_count" is actually present in stats as a key (rather than relying on the default):

Proposed hardening
 def _is_greenfield_project(spec_dir: Path) -> bool:
     """Check if the project is empty/greenfield (0 discovered files)."""
     stats = get_project_index_stats(spec_dir)
     if not stats:
         return False  # Can't determine - don't assume greenfield
-    return stats.get("file_count", 0) == 0
+    if "file_count" not in stats:
+        return False  # Unrecognized format - don't assume greenfield
+    return stats["file_count"] == 0
🤖 Prompt for AI Agents
In `@apps/backend/spec/phases/spec_phases.py` around lines 16 - 21, The current
_is_greenfield_project uses get_project_index_stats and treats any non-empty
dict as valid; to avoid false positives when the index has an unrecognized
format, first verify that the "file_count" key exists in the returned stats
(e.g., check '"file_count" in stats') before comparing its value, and return
False if that key is missing so only explicit file_count values are treated as
greenfield indicators.



def _greenfield_context() -> str:
"""Return additional context for greenfield/empty projects."""
return """
**GREENFIELD PROJECT**: This is an empty or new project with no existing code.
There are no existing files to reference or modify. You are creating everything from scratch.

Adapt your approach:
- Do NOT reference existing files, patterns, or code structures
- Focus on what needs to be CREATED, not modified
- Define the initial project structure, files, and directories
- Specify the tech stack, frameworks, and dependencies to install
- Provide setup instructions for the new project
- For "Files to Modify" and "Files to Reference" sections, list files to CREATE instead
- For "Patterns to Follow", describe industry best practices rather than existing code
"""


class SpecPhaseMixin:
"""Mixin for spec writing and critique phase methods."""

def _check_and_log_greenfield(self) -> bool:
"""Check if the project is greenfield and log if so.

Returns:
True if the project is greenfield (no existing files).
"""
is_greenfield = _is_greenfield_project(self.spec_dir)
if is_greenfield:
self.ui.print_status(
"Greenfield project detected - adapting spec for new project", "info"
)
return is_greenfield
Comment on lines +44 to +55
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Greenfield detection not captured in the persistent task log.

_check_and_log_greenfield prints via self.ui.print_status, which only reaches the console. The detection event isn't logged through self.task_logger, so it won't appear in the task's persistent log history and will be invisible to any post-mortem debugging of greenfield misdetections.

♻️ Suggested addition
 def _check_and_log_greenfield(self) -> bool:
     is_greenfield = _is_greenfield_project(self.spec_dir)
     if is_greenfield:
         self.ui.print_status(
             "Greenfield project detected - adapting spec for new project", "info"
         )
+        if hasattr(self, "task_logger") and self.task_logger is not None:
+            self.task_logger.log_info("Greenfield project detected: no existing files found")
     return is_greenfield
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/backend/spec/phases/spec_phases.py` around lines 44 - 55, The greenfield
detection currently only prints to console via self.ui.print_status in
_check_and_log_greenfield, so add a persistent task log entry using
self.task_logger (e.g., self.task_logger.info or appropriate level) when
_is_greenfield_project(self.spec_dir) returns True; mirror the same message
("Greenfield project detected - adapting spec for new project") and include any
context (spec_dir) so the event appears in the task's persistent logs for
post-mortem debugging.


async def phase_quick_spec(self) -> PhaseResult:
"""Quick spec for simple tasks - combines context and spec in one step."""
spec_file = self.spec_dir / "spec.md"
Expand All @@ -29,6 +65,8 @@ async def phase_quick_spec(self) -> PhaseResult:
"quick_spec", True, [str(spec_file), str(plan_file)], [], 0
)

is_greenfield = self._check_and_log_greenfield()

errors = []
for attempt in range(MAX_RETRIES):
self.ui.print_status(
Expand All @@ -42,7 +80,7 @@ async def phase_quick_spec(self) -> PhaseResult:

This is a SIMPLE task. Create a minimal spec and implementation plan directly.
No research or extensive analysis needed.

{_greenfield_context() if is_greenfield else ""}
Create:
1. A concise spec.md with just the essential sections
2. A simple implementation_plan.json with 1-2 subtasks
Expand Down Expand Up @@ -80,6 +118,9 @@ async def phase_spec_writing(self) -> PhaseResult:
"spec.md exists but has issues, regenerating...", "warning"
)

is_greenfield = self._check_and_log_greenfield()
greenfield_ctx = _greenfield_context() if is_greenfield else ""

errors = []
for attempt in range(MAX_RETRIES):
self.ui.print_status(
Expand All @@ -88,6 +129,7 @@ async def phase_spec_writing(self) -> PhaseResult:

success, output = await self.run_agent_fn(
"spec_writer.md",
additional_context=greenfield_ctx,
phase_name="spec_writing",
)

Expand Down
37 changes: 24 additions & 13 deletions apps/backend/spec/pipeline/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

import json
import types
from collections.abc import Callable
from pathlib import Path

Expand All @@ -18,6 +19,7 @@
from task_logger import (
LogEntryType,
LogPhase,
TaskLogger,
get_task_logger,
)
from ui import (
Expand Down Expand Up @@ -238,6 +240,9 @@ async def run(self, interactive: bool = True, auto_approve: bool = False) -> boo
task_logger.start_phase(LogPhase.PLANNING, "Starting spec creation process")
TaskEventEmitter.from_spec_dir(self.spec_dir).emit("PLANNING_STARTED")

# Track whether we've already ended the planning phase (to avoid double-end)
self._planning_phase_ended = False

try:
return await self._run_phases(interactive, auto_approve, task_logger, ui)
except Exception as e:
Expand All @@ -251,22 +256,24 @@ async def run(self, interactive: bool = True, auto_approve: bool = False) -> boo
)
except Exception:
pass # Don't mask the original error
try:
task_logger.end_phase(
LogPhase.PLANNING,
success=False,
message=f"Spec creation crashed: {e}",
)
except Exception:
pass # Best effort - don't mask the original error when logging fails
if not self._planning_phase_ended:
self._planning_phase_ended = True
try:
task_logger.end_phase(
LogPhase.PLANNING,
success=False,
message=f"Spec creation crashed: {e}",
)
except Exception:
pass # Best effort - don't mask the original error when logging fails
raise
Comment on lines 243 to 269
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The try...except block is a great addition for ensuring the planning phase always terminates. However, the planning_phase_ended flag is a local variable in the run method and is not shared with _run_phases. This means if _run_phases calls task_logger.end_phase() and then an unexpected exception occurs before it returns, task_logger.end_phase() will be called a second time in this except block, which could lead to issues.

To fix this, planning_phase_ended should be an instance variable (e.g., self.planning_phase_ended):

  1. Initialize self.planning_phase_ended = False in __init__ or at the start of run.
  2. In _run_phases, set self.planning_phase_ended = True immediately after every call to task_logger.end_phase().
  3. In this except block, check if not self.planning_phase_ended.


async def _run_phases(
self,
interactive: bool,
auto_approve: bool,
task_logger,
ui,
task_logger: TaskLogger,
ui: types.ModuleType,
) -> bool:
"""Internal method that runs all spec creation phases.

Expand Down Expand Up @@ -327,6 +334,7 @@ def run_phase(name: str, phase_fn: Callable) -> phases.PhaseResult:
results.append(result)
if not result.success:
print_status("Discovery failed", "error")
self._planning_phase_ended = True
task_logger.end_phase(
LogPhase.PLANNING, success=False, message="Discovery failed"
)
Expand All @@ -342,6 +350,7 @@ def run_phase(name: str, phase_fn: Callable) -> phases.PhaseResult:
results.append(result)
if not result.success:
print_status("Requirements gathering failed", "error")
self._planning_phase_ended = True
task_logger.end_phase(
LogPhase.PLANNING,
success=False,
Expand Down Expand Up @@ -380,6 +389,7 @@ def run_phase(name: str, phase_fn: Callable) -> phases.PhaseResult:
results.append(result)
if not result.success:
print_status("Complexity assessment failed", "error")
self._planning_phase_ended = True
task_logger.end_phase(
LogPhase.PLANNING, success=False, message="Complexity assessment failed"
)
Expand Down Expand Up @@ -442,6 +452,7 @@ def run_phase(name: str, phase_fn: Callable) -> phases.PhaseResult:
f"Phase '{phase_name}' failed: {'; '.join(result.errors)}",
LogEntryType.ERROR,
)
self._planning_phase_ended = True
task_logger.end_phase(
LogPhase.PLANNING,
success=False,
Expand All @@ -456,6 +467,7 @@ def run_phase(name: str, phase_fn: Callable) -> phases.PhaseResult:
self._print_completion_summary(results, phases_executed)

# End planning phase successfully
self._planning_phase_ended = True
task_logger.end_phase(
LogPhase.PLANNING, success=True, message="Spec creation complete"
)
Expand Down Expand Up @@ -729,9 +741,8 @@ def _run_review_checkpoint(self, auto_approve: bool) -> bool:
print_status("Build will not proceed without approval.", "warning")
return False

except SystemExit as e:
if e.code != 0:
return False
except SystemExit:
# Review checkpoint may call sys.exit(); treat any exit as unapproved
return False
Comment on lines +744 to 746
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find the file and examine the context around lines 744-746
cat -n apps/backend/spec/pipeline/orchestrator.py | sed -n '730,760p'

Repository: AndyMik90/Auto-Claude

Length of output: 1362


🏁 Script executed:

#!/bin/bash
# Search for run_review_checkpoint function definition and usage
rg -n 'def run_review_checkpoint' --type py -A 15

Repository: AndyMik90/Auto-Claude

Length of output: 1136


🏁 Script executed:

#!/bin/bash
# Search for sys.exit patterns in the codebase
rg -n 'sys\.exit' --type py -B 2 -A 2

Repository: AndyMik90/Auto-Claude

Length of output: 40968


Fix except SystemExit to differentiate between exit codes.

The bare except SystemExit: catches both sys.exit(0) and sys.exit(1) from run_review_checkpoint identically and returns False (unapproved). However, run_review_checkpoint calls sys.exit(0) in normal save/continue scenarios (reviewer.py:273, 337), which should not be treated as rejection. Inspect e.code to handle successful exits (0) separately from errors (1).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/backend/spec/pipeline/orchestrator.py` around lines 744 - 746, The
except SystemExit block in orchestrator.py should inspect the SystemExit
exception code from run_review_checkpoint rather than treating all exits as
unapproved; change the handler to catch SystemExit as e and return True when
e.code == 0 (normal save/continue), and return False (or propagate) for
non-zero/error codes (e.code == 1). Update the except block (the handler around
run_review_checkpoint) to check getattr(e, "code", 1) == 0 to locate successful
exits emitted by reviewer.py (see run_review_checkpoint and sys.exit calls) and
handle them accordingly.

except KeyboardInterrupt:
print()
Expand Down
Loading