Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions backend/app/api/endpoints/adapter/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,33 @@ def get_pipeline_stage_info(
)


@router.post("/{task_id}/skip-stage-confirmation", response_model=ConfirmStageResponse)
def skip_pipeline_stage_confirmation(
task_id: int = Depends(with_task_telemetry),
current_user: User = Depends(security.get_current_user),
db: Session = Depends(get_db),
):
"""
Skip stage confirmation and proceed to next stage using historical context.

Uses the last completed stage's result as context for the next stage.
If the result exceeds the threshold, an AI summary will be generated.

Args:
task_id: Task ID
current_user: Current authenticated user
db: Database session

Returns:
ConfirmStageResponse with stage info
"""
return task_kinds_service.skip_pipeline_stage_confirmation(
db=db,
task_id=task_id,
user_id=current_user.id,
)


@router.post("/{task_id}/share", response_model=TaskShareResponse)
def share_task(
task_id: int,
Expand Down
6 changes: 6 additions & 0 deletions backend/app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,12 @@ class Settings(BaseSettings):
# See backend/app/services/tables/DATA_TABLE_CONFIG_EXAMPLE.md for details
DATA_TABLE_CONFIG: str = ""

# Pipeline context settings for skip confirmation feature
# Maximum character length before AI summarization is applied
PIPELINE_CONTEXT_MAX_LENGTH: int = 4000
# Target length for AI-generated summary
PIPELINE_SUMMARY_MAX_LENGTH: int = 2000

# OpenTelemetry configuration is centralized in shared/telemetry/config.py
# Use: from shared.telemetry.config import get_otel_config
# All OTEL_* environment variables are read from there
Expand Down
16 changes: 15 additions & 1 deletion backend/app/services/adapters/executor_kinds.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,8 @@ def _format_subtasks_response(
aggregated_prompt = ""
# Check if this subtask has a confirmed_prompt from stage confirmation
confirmed_prompt_from_stage = None
# Check if this subtask has context from skip confirmation
context_from_skip = None
# Flag to indicate this subtask should start a new session (no conversation history)
# This is used in pipeline mode when user confirms a stage and proceeds to next bot
new_session = False
Expand All @@ -811,8 +813,20 @@ def _format_subtasks_response(
# Clear the temporary result so it doesn't interfere with execution
subtask.result = None
subtask.updated_at = datetime.now()
elif subtask.result.get("from_skip_confirmation"):
# Handle skip confirmation - use context from previous stage
context_from_skip = subtask.result.get("context")
# Mark that this subtask should use a new session
new_session = True
# Clear the temporary result so it doesn't interfere with execution
subtask.result = None
subtask.updated_at = datetime.now()

if confirmed_prompt_from_stage:
if context_from_skip is not None:
# Use the context from skip confirmation
# Format it as previous stage output for the next bot
aggregated_prompt = f"Previous stage output:\n{context_from_skip}"
elif confirmed_prompt_from_stage:
# Use the confirmed prompt from stage confirmation instead of building from previous results
aggregated_prompt = confirmed_prompt_from_stage
else:
Expand Down
266 changes: 260 additions & 6 deletions backend/app/services/adapters/pipeline_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
checking if a stage requires confirmation, and managing stage transitions.
"""

import json
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
Expand All @@ -17,6 +18,7 @@
from sqlalchemy.orm import Session
from sqlalchemy.orm.attributes import flag_modified

from app.core.config import settings
from app.models.kind import Kind
from app.models.shared_team import SharedTeam
from app.models.subtask import Subtask, SubtaskRole, SubtaskStatus
Expand Down Expand Up @@ -471,6 +473,7 @@ def _create_next_stage_subtask(
team_crd: Team,
next_stage_index: int,
confirmed_prompt: str,
from_skip_confirmation: bool = False,
) -> Optional[Subtask]:
"""
Create a subtask for the next pipeline stage.
Expand All @@ -481,7 +484,8 @@ def _create_next_stage_subtask(
task_crd: Task CRD object
team_crd: Team CRD object
next_stage_index: Index of the next stage (0-based)
confirmed_prompt: The confirmed prompt to pass to the next stage
confirmed_prompt: The confirmed prompt/context to pass to the next stage
from_skip_confirmation: If True, marks this as skip confirmation (context mode)

Returns:
The created Subtask object, or None if creation failed
Expand Down Expand Up @@ -560,6 +564,18 @@ def _create_next_stage_subtask(
executor_name = existing_assistant.executor_name or ""
executor_namespace = existing_assistant.executor_namespace or ""

# Build result based on confirmation type
if from_skip_confirmation:
result_data = {
"context": confirmed_prompt,
"from_skip_confirmation": True,
}
else:
result_data = {
"confirmed_prompt": confirmed_prompt,
"from_stage_confirmation": True,
}

# Create the new subtask for the next stage
new_subtask = Subtask(
user_id=last_subtask.user_id,
Expand All @@ -577,17 +593,15 @@ def _create_next_stage_subtask(
executor_namespace=executor_namespace,
error_message="",
completed_at=None,
result={
"confirmed_prompt": confirmed_prompt,
"from_stage_confirmation": True,
},
result=result_data,
)

db.add(new_subtask)
db.flush() # Get the new subtask ID

logger.info(
f"Pipeline confirm_stage: created subtask {new_subtask.id} for stage {next_stage_index} "
f"Pipeline {'skip' if from_skip_confirmation else 'confirm'}_stage: "
f"created subtask {new_subtask.id} for stage {next_stage_index} "
f"(bot={bot.name}, message_id={next_message_id})"
)

Expand Down Expand Up @@ -667,6 +681,246 @@ def get_team_for_task(

return team

def skip_stage_confirmation(
self,
db: Session,
task: TaskResource,
task_crd: Task,
team_crd: Team,
) -> Dict[str, Any]:
"""
Skip stage confirmation and proceed to next stage using last stage's result.

Gets the last completed stage's result as context and creates a new subtask
for the next stage. If the result is too long, it will be summarized using AI.

Args:
db: Database session
task: Task resource object
task_crd: Task CRD object
team_crd: Team CRD object

Returns:
Dict with skip result info
"""
# Get current pipeline stage info
stage_info = self.get_stage_info(db, task.id, team_crd)

# Get the context from the last completed stage
context = self.get_last_stage_result_as_context(db, task.id)

if not context:
# If no context available, use empty context
context = ""
logger.warning(
f"Pipeline skip_stage_confirmation: no context available for task {task.id}"
)

current_stage = stage_info["current_stage"]
next_stage = current_stage + 1

if next_stage >= stage_info["total_stages"]:
# No more stages, mark task as completed
task_crd.status.status = "COMPLETED"
task_crd.status.progress = 100
task_crd.status.updatedAt = datetime.now()
task.json = task_crd.model_dump(mode="json", exclude_none=True)
task.updated_at = datetime.now()
task.completed_at = datetime.now()
flag_modified(task, "json")
db.commit()

return {
"message": "Pipeline completed",
"task_id": task.id,
"current_stage": current_stage,
"total_stages": stage_info["total_stages"],
"next_stage_name": None,
}

# Reuse _create_next_stage_subtask with from_skip_confirmation=True
next_subtask = self._create_next_stage_subtask(
db,
task,
task_crd,
team_crd,
next_stage,
context,
from_skip_confirmation=True,
)

if not next_subtask:
raise HTTPException(
status_code=500,
detail="Failed to create subtask for next pipeline stage",
)

# Update task status back to PENDING
task_crd.status.status = "PENDING"
task_crd.status.updatedAt = datetime.now()
task.json = task_crd.model_dump(mode="json", exclude_none=True)
task.updated_at = datetime.now()
flag_modified(task, "json")
db.commit()

# Get next stage name
next_stage_name = None
if next_stage < len(team_crd.spec.members):
next_bot_ref = team_crd.spec.members[next_stage].botRef
next_stage_name = next_bot_ref.name

return {
"message": "Stage skipped, proceeding to next stage",
"task_id": task.id,
"current_stage": next_stage,
"total_stages": stage_info["total_stages"],
"next_stage_name": next_stage_name,
}

def get_last_stage_result_as_context(
self,
db: Session,
task_id: int,
max_length: Optional[int] = None,
) -> str:
"""
Get the last completed stage's result as context for next stage.

Args:
db: Database session
task_id: Task ID
max_length: Maximum character length before summarization (default from settings)

Returns:
Context string (original result or AI summary if too long)
"""
if max_length is None:
max_length = settings.PIPELINE_CONTEXT_MAX_LENGTH

# Get the last completed assistant subtask
last_completed = (
db.query(Subtask)
.filter(
Subtask.task_id == task_id,
Subtask.role == SubtaskRole.ASSISTANT,
Subtask.status == SubtaskStatus.COMPLETED,
)
.order_by(Subtask.message_id.desc())
.first()
)

if not last_completed:
logger.warning(
f"Pipeline get_last_stage_result_as_context: no completed subtask for task {task_id}"
)
return ""

# Extract result content
result = last_completed.result
if not result:
return ""

# Handle different result formats
if isinstance(result, dict):
# If result is a dict, try to extract text content
if "text" in result:
content = result["text"]
elif "content" in result:
content = result["content"]
elif "message" in result:
content = result["message"]
else:
# Convert dict to JSON string
content = json.dumps(result, ensure_ascii=False, indent=2)
elif isinstance(result, str):
content = result
else:
content = str(result)

# Check if content exceeds max length
if len(content) > max_length:
logger.info(
f"Pipeline context exceeds max length ({len(content)} > {max_length}), "
f"summarizing for task {task_id}"
)
# Summarize using AI (synchronous call)
content = self._summarize_context_sync(content, max_length)

return content

def _summarize_context_sync(
self, content: str, target_length: Optional[int] = None
) -> str:
"""
Use AI to summarize long context content (synchronous version).

Calls the configured LLM with a summarization prompt to compress
the content while preserving key information.

Args:
content: The content to summarize
target_length: Target length for summary (default from settings)

Returns:
Summarized content
"""
if target_length is None:
target_length = settings.PIPELINE_SUMMARY_MAX_LENGTH

try:
# Import here to avoid circular imports
from app.services.chat.llm_client import get_default_llm_client

llm_client = get_default_llm_client()
if not llm_client:
logger.warning(
"Pipeline context summarization: no LLM client available, truncating instead"
)
# Fallback to simple truncation
return (
content[:target_length] + "..."
if len(content) > target_length
else content
)

# Create summarization prompt
system_prompt = (
"You are a helpful assistant that summarizes text. "
"Preserve key information and main points. "
"Be concise and clear."
)
user_prompt = (
f"Please summarize the following text to approximately {target_length} characters, "
f"preserving the key information and main points:\n\n{content}"
)

# Call LLM for summarization (synchronous)
import asyncio

loop = asyncio.new_event_loop()
try:
summary = loop.run_until_complete(
llm_client.complete(
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
max_tokens=target_length * 2, # Allow some buffer
)
)
return summary if summary else content[:target_length]
finally:
loop.close()

except Exception as e:
logger.error(f"Pipeline context summarization failed: {str(e)}")
# Fallback to simple truncation
return (
content[:target_length] + "..."
if len(content) > target_length
else content
)


# Singleton instance
pipeline_stage_service = PipelineStageService()
Loading