Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions backend/app/api/endpoints/adapter/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,33 @@ def get_pipeline_stage_info(
)


@router.post("/{task_id}/skip-stage-confirmation", response_model=ConfirmStageResponse)
def skip_pipeline_stage_confirmation(
task_id: int = Depends(with_task_telemetry),
current_user: User = Depends(security.get_current_user),
db: Session = Depends(get_db),
):
"""
Skip stage confirmation and proceed to next stage using historical context.

Uses the last completed stage's result as context for the next stage.
If the result exceeds the threshold, an AI summary will be generated.

Args:
task_id: Task ID
current_user: Current authenticated user
db: Database session

Returns:
ConfirmStageResponse with stage info
"""
return task_kinds_service.skip_pipeline_stage_confirmation(
db=db,
task_id=task_id,
user_id=current_user.id,
)


@router.post("/{task_id}/share", response_model=TaskShareResponse)
def share_task(
task_id: int,
Expand Down
6 changes: 6 additions & 0 deletions backend/app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,12 @@ class Settings(BaseSettings):
# See backend/app/services/tables/DATA_TABLE_CONFIG_EXAMPLE.md for details
DATA_TABLE_CONFIG: str = ""

# Pipeline context settings for skip confirmation feature
# Maximum character length before AI summarization is applied
PIPELINE_CONTEXT_MAX_LENGTH: int = 4000
# Target length for AI-generated summary
PIPELINE_SUMMARY_MAX_LENGTH: int = 2000

# OpenTelemetry configuration is centralized in shared/telemetry/config.py
# Use: from shared.telemetry.config import get_otel_config
# All OTEL_* environment variables are read from there
Expand Down
16 changes: 15 additions & 1 deletion backend/app/services/adapters/executor_kinds.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,8 @@ def _format_subtasks_response(
aggregated_prompt = ""
# Check if this subtask has a confirmed_prompt from stage confirmation
confirmed_prompt_from_stage = None
# Check if this subtask has context from skip confirmation
context_from_skip = None
# Flag to indicate this subtask should start a new session (no conversation history)
# This is used in pipeline mode when user confirms a stage and proceeds to next bot
new_session = False
Expand All @@ -811,8 +813,20 @@ def _format_subtasks_response(
# Clear the temporary result so it doesn't interfere with execution
subtask.result = None
subtask.updated_at = datetime.now()
elif subtask.result.get("from_skip_confirmation"):
# Handle skip confirmation - use context from previous stage
context_from_skip = subtask.result.get("context")
# Mark that this subtask should use a new session
new_session = True
# Clear the temporary result so it doesn't interfere with execution
subtask.result = None
subtask.updated_at = datetime.now()

if confirmed_prompt_from_stage:
if context_from_skip is not None:
# Use the context from skip confirmation
# Format it as previous stage output for the next bot
aggregated_prompt = f"Previous stage output:\n{context_from_skip}"
elif confirmed_prompt_from_stage:
# Use the confirmed prompt from stage confirmation instead of building from previous results
aggregated_prompt = confirmed_prompt_from_stage
else:
Expand Down
264 changes: 258 additions & 6 deletions backend/app/services/adapters/pipeline_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
checking if a stage requires confirmation, and managing stage transitions.
"""

import json
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
Expand All @@ -17,6 +18,7 @@
from sqlalchemy.orm import Session
from sqlalchemy.orm.attributes import flag_modified

from app.core.config import settings
from app.models.kind import Kind
from app.models.shared_team import SharedTeam
from app.models.subtask import Subtask, SubtaskRole, SubtaskStatus
Expand Down Expand Up @@ -471,6 +473,7 @@ def _create_next_stage_subtask(
team_crd: Team,
next_stage_index: int,
confirmed_prompt: str,
from_skip_confirmation: bool = False,
) -> Optional[Subtask]:
"""
Create a subtask for the next pipeline stage.
Expand All @@ -481,7 +484,8 @@ def _create_next_stage_subtask(
task_crd: Task CRD object
team_crd: Team CRD object
next_stage_index: Index of the next stage (0-based)
confirmed_prompt: The confirmed prompt to pass to the next stage
confirmed_prompt: The confirmed prompt/context to pass to the next stage
from_skip_confirmation: If True, marks this as skip confirmation (context mode)

Returns:
The created Subtask object, or None if creation failed
Expand Down Expand Up @@ -560,6 +564,18 @@ def _create_next_stage_subtask(
executor_name = existing_assistant.executor_name or ""
executor_namespace = existing_assistant.executor_namespace or ""

# Build result based on confirmation type
if from_skip_confirmation:
result_data = {
"context": confirmed_prompt,
"from_skip_confirmation": True,
}
else:
result_data = {
"confirmed_prompt": confirmed_prompt,
"from_stage_confirmation": True,
}

# Create the new subtask for the next stage
new_subtask = Subtask(
user_id=last_subtask.user_id,
Expand All @@ -577,17 +593,15 @@ def _create_next_stage_subtask(
executor_namespace=executor_namespace,
error_message="",
completed_at=None,
result={
"confirmed_prompt": confirmed_prompt,
"from_stage_confirmation": True,
},
result=result_data,
)

db.add(new_subtask)
db.flush() # Get the new subtask ID

logger.info(
f"Pipeline confirm_stage: created subtask {new_subtask.id} for stage {next_stage_index} "
f"Pipeline {'skip' if from_skip_confirmation else 'confirm'}_stage: "
f"created subtask {new_subtask.id} for stage {next_stage_index} "
f"(bot={bot.name}, message_id={next_message_id})"
)

Expand Down Expand Up @@ -667,6 +681,244 @@ def get_team_for_task(

return team

def skip_stage_confirmation(
self,
db: Session,
task: TaskResource,
task_crd: Task,
team_crd: Team,
) -> Dict[str, Any]:
"""
Skip stage confirmation and proceed to next stage using last stage's result.

Gets the last completed stage's result as context and creates a new subtask
for the next stage. If the result is too long, it will be summarized using AI.

Args:
db: Database session
task: Task resource object
task_crd: Task CRD object
team_crd: Team CRD object

Returns:
Dict with skip result info
"""
# Get current pipeline stage info
stage_info = self.get_stage_info(db, task.id, team_crd)

# Get the context from the last completed stage
context = self.get_last_stage_result_as_context(db, task.id)

if not context:
# If no context available, use empty context
context = ""
logger.warning(
f"Pipeline skip_stage_confirmation: no context available for task {task.id}"
)

current_stage = stage_info["current_stage"]
next_stage = current_stage + 1

if next_stage >= stage_info["total_stages"]:
# No more stages, mark task as completed
task_crd.status.status = "COMPLETED"
task_crd.status.progress = 100
task_crd.status.updatedAt = datetime.now()
task.json = task_crd.model_dump(mode="json", exclude_none=True)
task.updated_at = datetime.now()
task.completed_at = datetime.now()
flag_modified(task, "json")
db.commit()

return {
"message": "Pipeline completed",
"task_id": task.id,
"current_stage": current_stage,
"total_stages": stage_info["total_stages"],
"next_stage_name": None,
}

# Reuse _create_next_stage_subtask with from_skip_confirmation=True
next_subtask = self._create_next_stage_subtask(
db,
task,
task_crd,
team_crd,
next_stage,
context,
from_skip_confirmation=True,
)

if not next_subtask:
raise HTTPException(
status_code=500,
detail="Failed to create subtask for next pipeline stage",
)

# Update task status back to PENDING
task_crd.status.status = "PENDING"
task_crd.status.updatedAt = datetime.now()
task.json = task_crd.model_dump(mode="json", exclude_none=True)
task.updated_at = datetime.now()
flag_modified(task, "json")
db.commit()

# Get next stage name
next_stage_name = None
if next_stage < len(team_crd.spec.members):
next_bot_ref = team_crd.spec.members[next_stage].botRef
next_stage_name = next_bot_ref.name

return {
"message": "Stage skipped, proceeding to next stage",
"task_id": task.id,
"current_stage": next_stage,
"total_stages": stage_info["total_stages"],
"next_stage_name": next_stage_name,
}

def get_last_stage_result_as_context(
self,
db: Session,
task_id: int,
max_length: int = None,
) -> str:
"""
Get the last completed stage's result as context for next stage.

Args:
db: Database session
task_id: Task ID
max_length: Maximum character length before summarization (default from settings)

Returns:
Context string (original result or AI summary if too long)
"""
if max_length is None:
max_length = settings.PIPELINE_CONTEXT_MAX_LENGTH

# Get the last completed assistant subtask
last_completed = (
db.query(Subtask)
.filter(
Subtask.task_id == task_id,
Subtask.role == SubtaskRole.ASSISTANT,
Subtask.status == SubtaskStatus.COMPLETED,
)
.order_by(Subtask.message_id.desc())
.first()
)

if not last_completed:
logger.warning(
f"Pipeline get_last_stage_result_as_context: no completed subtask for task {task_id}"
)
return ""

# Extract result content
result = last_completed.result
if not result:
return ""

# Handle different result formats
if isinstance(result, dict):
# If result is a dict, try to extract text content
if "text" in result:
content = result["text"]
elif "content" in result:
content = result["content"]
elif "message" in result:
content = result["message"]
else:
# Convert dict to JSON string
content = json.dumps(result, ensure_ascii=False, indent=2)
elif isinstance(result, str):
content = result
else:
content = str(result)

# Check if content exceeds max length
if len(content) > max_length:
logger.info(
f"Pipeline context exceeds max length ({len(content)} > {max_length}), "
f"summarizing for task {task_id}"
)
# Summarize using AI (synchronous call)
content = self._summarize_context_sync(content, max_length)

return content

def _summarize_context_sync(self, content: str, target_length: int = None) -> str:
"""
Use AI to summarize long context content (synchronous version).

Calls the configured LLM with a summarization prompt to compress
the content while preserving key information.

Args:
content: The content to summarize
target_length: Target length for summary (default from settings)

Returns:
Summarized content
"""
if target_length is None:
target_length = settings.PIPELINE_SUMMARY_MAX_LENGTH

try:
# Import here to avoid circular imports
from app.services.chat.llm_client import get_default_llm_client

llm_client = get_default_llm_client()
if not llm_client:
logger.warning(
"Pipeline context summarization: no LLM client available, truncating instead"
)
# Fallback to simple truncation
return (
content[:target_length] + "..."
if len(content) > target_length
else content
)

# Create summarization prompt
system_prompt = (
"You are a helpful assistant that summarizes text. "
"Preserve key information and main points. "
"Be concise and clear."
)
user_prompt = (
f"Please summarize the following text to approximately {target_length} characters, "
f"preserving the key information and main points:\n\n{content}"
)

# Call LLM for summarization (synchronous)
import asyncio

loop = asyncio.new_event_loop()
try:
summary = loop.run_until_complete(
llm_client.complete(
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
max_tokens=target_length * 2, # Allow some buffer
)
)
return summary if summary else content[:target_length]
finally:
loop.close()

except Exception as e:
logger.error(f"Pipeline context summarization failed: {str(e)}")
# Fallback to simple truncation
return (
content[:target_length] + "..."
if len(content) > target_length
else content
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's locate and examine the file
find . -name "pipeline_stage.py" -type f | head -20

Repository: wecode-ai/Wegent

Length of output: 109


🏁 Script executed:

# Check Python version requirements
find . -name "pyproject.toml" -o -name "setup.py" -o -name ".python-version" | head -10

Repository: wecode-ai/Wegent

Length of output: 218


🏁 Script executed:

# Search for how _summarize_context_sync is called
rg "_summarize_context_sync" --type py -B 2 -A 2

Repository: wecode-ai/Wegent

Length of output: 825


🏁 Script executed:

# Check how this method is used in the codebase and what calls it
rg "skip_stage_confirmation\|_summarize_context" --type py -A 3 -B 1

Repository: wecode-ai/Wegent

Length of output: 42


🏁 Script executed:

# Let's examine the actual file to understand context better
cat -n ./backend/app/services/adapters/pipeline_stage.py | sed -n '800,930p'

Repository: wecode-ai/Wegent

Length of output: 5418


🏁 Script executed:

# Check Python version requirement in pyproject.toml
cat ./backend/pyproject.toml | grep -A 10 "python"

Repository: wecode-ai/Wegent

Length of output: 146


🏁 Script executed:

# Search for FastAPI usage in the codebase to understand if this is in async context
rg "FastAPI\|@app\|@router" --type py backend/ | head -20

Repository: wecode-ai/Wegent

Length of output: 42


🏁 Script executed:

# Check how the pipeline_stage module is used
rg "pipeline_stage\|PipelineStage" --type py -A 2 | head -40

Repository: wecode-ai/Wegent

Length of output: 42


🏁 Script executed:

# Look for asyncio usage patterns in the codebase
rg "asyncio\.(run|new_event_loop)" --type py -B 2 -A 2

Repository: wecode-ai/Wegent

Length of output: 12341


🏁 Script executed:

# Check where get_last_stage_result_as_context is called
rg "get_last_stage_result_as_context" --type py -B 3 -A 3

Repository: wecode-ai/Wegent

Length of output: 1702


🏁 Script executed:

# Check the Python version requirement more specifically
cat ./backend/pyproject.toml | head -50

Repository: wecode-ai/Wegent

Length of output: 1436


🏁 Script executed:

# Look at how knowledge_service handles the similar scenario (get_running_loop pattern)
cat -n ./backend/app/services/knowledge_service.py | grep -A 15 "get_running_loop"

Repository: wecode-ai/Wegent

Length of output: 788


🏁 Script executed:

# Check if there's documentation about asyncio patterns in the codebase
rg "asyncio.run\(\)" --type py -B 1 -A 1 | head -60

Repository: wecode-ai/Wegent

Length of output: 326


🏁 Script executed:

# Look at the specific exception scenario - what happens when loop is already running
rg "RuntimeError.*already running" --type py -B 2 -A 2

Repository: wecode-ai/Wegent

Length of output: 42


🏁 Script executed:

# Let's find where skip_stage_confirmation is called to understand the call context
rg "skip_stage_confirmation" --type py -B 2 -A 5

Repository: wecode-ai/Wegent

Length of output: 1826


🏁 Script executed:

# Check if the method is called from async endpoints or sync context
rg "def skip_stage_confirmation\|skip_stage_confirmation\(" --type py -B 3 -A 1 | head -40

Repository: wecode-ai/Wegent

Length of output: 42


🏁 Script executed:

# Look for FastAPI endpoint usage of this method
rg "@router\|@app\|async def" --type py backend/app/api/ | head -30

Repository: wecode-ai/Wegent

Length of output: 42


🏁 Script executed:

# Find where skip_stage_confirmation is called from task_kinds
rg "skip_stage_confirmation" --type py backend/app/ -B 5 | head -50

Repository: wecode-ai/Wegent

Length of output: 1470


🏁 Script executed:

# Check the task_kinds file for the method that calls skip_stage_confirmation
cat -n ./backend/app/services/adapters/task_kinds.py | grep -B 20 -A 5 "skip_stage_confirmation"

Repository: wecode-ai/Wegent

Length of output: 1190


🏁 Script executed:

# Search for endpoints that use task_kinds to understand the flow
rg "task_kinds\|TaskKinds" --type py backend/app/api/ -A 2 | head -40

Repository: wecode-ai/Wegent

Length of output: 42


🏁 Script executed:

# Check for FastAPI route handlers that might call this indirectly
rg "def.*task" --type py backend/app/api/endpoints/ | head -20

Repository: wecode-ai/Wegent

Length of output: 1412


🏁 Script executed:

# Find the full method in task_kinds.py that calls skip_stage_confirmation
cat -n ./backend/app/services/adapters/task_kinds.py | grep -B 50 "skip_stage_confirmation" | tail -60

Repository: wecode-ai/Wegent

Length of output: 2227


🏁 Script executed:

# Check if the method in task_kinds is async or sync
rg "def.*skip.*stage\|async def.*skip" --type py backend/app/services/adapters/task_kinds.py -A 1

Repository: wecode-ai/Wegent

Length of output: 42


🏁 Script executed:

# Find the endpoint that calls task_kinds method
rg "task_kinds\." --type py backend/app/ -B 5 -A 2 | head -60

Repository: wecode-ai/Wegent

Length of output: 761


🏁 Script executed:

# Check the specific endpoint handler for skip_stage_confirmation
rg "skip.*stage\|stage.*confirm" --type py backend/app/api/endpoints/ -B 3 -A 3

Repository: wecode-ai/Wegent

Length of output: 42


🏁 Script executed:

# Find where skip_stage_confirmation is called from task_kinds.py class
cat -n ./backend/app/services/adapters/task_kinds.py | grep -B 100 "skip_stage_confirmation" | head -120

Repository: wecode-ai/Wegent

Length of output: 4258


🏁 Script executed:

# Search for the actual endpoint definition
rg "skip.*stage\|stage.*confirmation" --type py backend/app/api/endpoints/ -B 5 -A 1

Repository: wecode-ai/Wegent

Length of output: 42


🏁 Script executed:

# Look for the method name that contains skip_stage_confirmation logic
rg "def.*confirm\|def.*skip" --type py backend/app/services/adapters/task_kinds.py -A 1

Repository: wecode-ai/Wegent

Length of output: 42


🏁 Script executed:

# Check what the actual endpoint calling method looks like
cat -n ./backend/app/services/adapters/task_kinds.py | sed -n '2100,2200p'

Repository: wecode-ai/Wegent

Length of output: 4182


🏁 Script executed:

# Find where skip_pipeline_stage_confirmation is called from
rg "skip_pipeline_stage_confirmation" --type py -B 3 -A 1

Repository: wecode-ai/Wegent

Length of output: 1215


🏁 Script executed:

# Check the endpoint that calls this method
cat -n ./backend/app/api/endpoints/adapter/tasks.py | grep -B 10 -A 10 "skip"

Repository: wecode-ai/Wegent

Length of output: 6095


🏁 Script executed:

# Check what knowledge_service does as it has a similar pattern (get_running_loop handling)
cat -n ./backend/app/services/knowledge_service.py | sed -n '710,735p'

Repository: wecode-ai/Wegent

Length of output: 1249


🏁 Script executed:

# Look at jobs.py to see how it handles asyncio.new_event_loop with set_event_loop
cat -n ./backend/app/services/jobs.py | grep -B 5 -A 15 "new_event_loop"

Repository: wecode-ai/Wegent

Length of output: 950


🏁 Script executed:

# Check if llm_client has a sync version of complete method
rg "def complete\|async def complete" --type py backend/app/services/chat/ -B 2 -A 3

Repository: wecode-ai/Wegent

Length of output: 42


🏁 Script executed:

# Look at the llm_client structure to understand available methods
find ./backend/app/services/chat -name "*llm*" -type f

Repository: wecode-ai/Wegent

Length of output: 42


🏁 Script executed:

# Find all files in chat services
find ./backend/app/services/chat -type f -name "*.py" | head -20

Repository: wecode-ai/Wegent

Length of output: 1033


🏁 Script executed:

# Search for the llm_client module
rg "llm_client\|get_default_llm_client" --type py backend/ -l | head -10

Repository: wecode-ai/Wegent

Length of output: 42


🏁 Script executed:

# Look for the LLM client implementation
rg "class.*LLM\|def.*complete" --type py backend/app/services/chat/ -A 1 | head -40

Repository: wecode-ai/Wegent

Length of output: 42


🏁 Script executed:

# Find where get_default_llm_client is defined
rg "def get_default_llm_client" --type py backend/

Repository: wecode-ai/Wegent

Length of output: 42


🏁 Script executed:

# Search in the entire backend for llm_client
fd "llm" backend/app/services/chat/ --type f

Repository: wecode-ai/Wegent

Length of output: 42


🏁 Script executed:

# Let's check the actual import in pipeline_stage.py
cat -n ./backend/app/services/adapters/pipeline_stage.py | sed -n '868,872p'

Repository: wecode-ai/Wegent

Length of output: 286


🏁 Script executed:

# Search for complete method definition in backend services
rg "def complete\|async def complete" --type py backend/app/

Repository: wecode-ai/Wegent

Length of output: 42


🏁 Script executed:

# Search for llm_client in all files
rg "llm_client" --type py | head -20

Repository: wecode-ai/Wegent

Length of output: 449


🏁 Script executed:

# Find where get_default_llm_client is defined
rg "get_default_llm_client" --type py -B 1 -A 5

Repository: wecode-ai/Wegent

Length of output: 897


🏁 Script executed:

# Check the main chat services directory structure
ls -la ./backend/app/services/chat/

Repository: wecode-ai/Wegent

Length of output: 902


🏁 Script executed:

# Look in the chat directory more carefully
find ./backend/app/services/chat -name "*.py" -exec grep -l "llm_client\|get_default_llm_client" {} \;

Repository: wecode-ai/Wegent

Length of output: 42


🏁 Script executed:

# Search the entire repository for llm_client.py
find . -name "llm_client.py" -type f

Repository: wecode-ai/Wegent

Length of output: 42


🏁 Script executed:

# Search for where this module might be defined or referenced
rg "llm_client" --type py -A 10 | head -80

Repository: wecode-ai/Wegent

Length of output: 2280


🏁 Script executed:

# Check the __init__.py in chat services to see what's exported
cat ./backend/app/services/chat/__init__.py

Repository: wecode-ai/Wegent

Length of output: 952


🏁 Script executed:

# Search in the broader codebase for complete method that's async
rg "async def complete" --type py -A 3 | head -40

Repository: wecode-ai/Wegent

Length of output: 348


🏁 Script executed:

# Check if the llm_client might be in a different location or if this is a missing module
rg "class.*LLMClient\|class.*LLM" --type py

Repository: wecode-ai/Wegent

Length of output: 42


🏁 Script executed:

# Search for where complete is called elsewhere
rg "\.complete\(" --type py -B 2 -A 2 | head -60

Repository: wecode-ai/Wegent

Length of output: 517


🏁 Script executed:

# Check the broader imports and how async operations are handled
rg "from.*llm\|import.*llm" --type py backend/app/services/

Repository: wecode-ai/Wegent

Length of output: 42


🏁 Script executed:

# Let's verify what the actual module structure is
git ls-files | grep -i llm

Repository: wecode-ai/Wegent

Length of output: 201


🏁 Script executed:

# Verify the async pattern used in executor services
cat -n ./backend/app/services/adapters/executor_kinds.py | sed -n '1,50p'

Repository: wecode-ai/Wegent

Length of output: 1917


🏁 Script executed:

# Look at how other sync methods handle async operations
rg "asyncio.run\|new_event_loop" --type py backend/app/services/ -B 3 -A 3 | head -80

Repository: wecode-ai/Wegent

Length of output: 42


🏁 Script executed:

# Check if this is actually an issue by confirming if sync endpoints run in FastAPI async context
rg "def skip_pipeline_stage_confirmation\|async def" --type py backend/app/api/endpoints/adapter/tasks.py -A 1 | head -20

Repository: wecode-ai/Wegent

Length of output: 42


🏁 Script executed:

# Verify the actual runtime behavior - check how FastAPI handles this
cat -n ./backend/app/api/endpoints/adapter/tasks.py | sed -n '299,324p'

Repository: wecode-ai/Wegent

Length of output: 1077


Use a safe asyncio pattern to handle running event loops in sync context.

The asyncio.new_event_loop() call can fail at runtime when this sync method is invoked from a FastAPI async context. Since the endpoint POST /{task_id}/skip-stage-confirmation is a sync handler running within FastAPI's async event loop, there will already be a running loop in the thread, causing asyncio.new_event_loop() to raise a RuntimeError.

Instead, follow the pattern already used elsewhere in the codebase (e.g., knowledge_service.py lines 716-726):

  • Check for a running loop with asyncio.get_running_loop()
  • If none exists, safely use asyncio.run()
  • If one exists, run the coroutine in a separate thread via ThreadPoolExecutor

Additionally:

  • Update the type hint from int = None to int | None (Python 3.10+ syntax)
  • Replace logger.error(f"...{str(e)}") with logger.exception("...") to include the full stack trace for debugging
🧰 Tools
🪛 Ruff (0.14.10)

851-851: PEP 484 prohibits implicit Optional

Convert to T | None

(RUF013)


913-913: Do not catch blind exception: Exception

(BLE001)


914-914: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


914-914: Use explicit conversion flag

Replace with conversion flag

(RUF010)



# Singleton instance
pipeline_stage_service = PipelineStageService()
Loading