From d21eeae26aa409b27e1527d86a85fc02ce216874 Mon Sep 17 00:00:00 2001 From: qdaxb <4157870+qdaxb@users.noreply.github.com> Date: Sun, 11 Jan 2026 01:46:49 +0800 Subject: [PATCH 01/11] =?UTF-8?q?feat(flow):=20add=20AI=20Flow=20(?= =?UTF-8?q?=E6=99=BA=E8=83=BD=E6=B5=81)=20module=20for=20automated=20task?= =?UTF-8?q?=20execution?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit introduces the AI Flow module which enables automated task execution with configurable triggers. Key features include: Backend: - Flow CRD schema with support for cron, interval, one-time, and event triggers - FlowResource and FlowExecution database models with proper indexes - Flow service layer for CRUD operations and execution management - Flow scheduler for periodic execution of scheduled flows - REST API endpoints for flow management and timeline view - Webhook endpoint for external trigger support Frontend: - Flow types and API client - FlowContext for state management - FlowList component for configuration management - FlowTimeline component for execution history display - FlowForm component for creating/editing flows - FlowPage main component with tab navigation - i18n translations (zh-CN and en) - Navigation entry in DesktopNavLinks Database: - flows table for storing flow configurations - flow_executions table for execution records - Proper indexes for scheduler queries Co-Authored-By: Claude --- .../versions/q7r8s9t0u1v2_add_flow_tables.py | 150 +++ backend/app/api/api.py | 2 + backend/app/api/endpoints/adapter/flows.py | 257 +++++ backend/app/models/flow.py | 138 +++ backend/app/schemas/flow.py | 322 ++++++ backend/app/services/flow.py | 964 ++++++++++++++++++ backend/app/services/flow_scheduler.py | 389 +++++++ backend/app/services/jobs.py | 7 + backend/tests/api/endpoints/test_flows.py | 287 ++++++ frontend/src/apis/flow.ts | 126 +++ frontend/src/app/flow/page.tsx | 9 + frontend/src/config/paths.ts | 3 + .../features/flows/components/FlowForm.tsx | 513 ++++++++++ .../features/flows/components/FlowList.tsx | 307 ++++++ .../features/flows/components/FlowPage.tsx | 108 ++ .../flows/components/FlowTimeline.tsx | 278 +++++ .../src/features/flows/components/index.ts | 8 + .../features/flows/contexts/flowContext.tsx | 224 ++++ .../layout/components/DesktopNavLinks.tsx | 12 +- frontend/src/i18n/locales/en/common.json | 1 + frontend/src/i18n/locales/en/flow.json | 81 ++ frontend/src/i18n/locales/zh-CN/common.json | 1 + frontend/src/i18n/locales/zh-CN/flow.json | 81 ++ frontend/src/i18n/setup.ts | 2 + frontend/src/types/flow.ts | 160 +++ 25 files changed, 4429 insertions(+), 1 deletion(-) create mode 100644 backend/alembic/versions/q7r8s9t0u1v2_add_flow_tables.py create mode 100644 backend/app/api/endpoints/adapter/flows.py create mode 100644 backend/app/models/flow.py create mode 100644 backend/app/schemas/flow.py create mode 100644 backend/app/services/flow.py create mode 100644 backend/app/services/flow_scheduler.py create mode 100644 backend/tests/api/endpoints/test_flows.py create mode 100644 frontend/src/apis/flow.ts create mode 100644 frontend/src/app/flow/page.tsx create mode 100644 frontend/src/features/flows/components/FlowForm.tsx create mode 100644 frontend/src/features/flows/components/FlowList.tsx create mode 100644 frontend/src/features/flows/components/FlowPage.tsx create mode 100644 frontend/src/features/flows/components/FlowTimeline.tsx create mode 100644 frontend/src/features/flows/components/index.ts create mode 100644 frontend/src/features/flows/contexts/flowContext.tsx create mode 100644 frontend/src/i18n/locales/en/flow.json create mode 100644 frontend/src/i18n/locales/zh-CN/flow.json create mode 100644 frontend/src/types/flow.ts diff --git a/backend/alembic/versions/q7r8s9t0u1v2_add_flow_tables.py b/backend/alembic/versions/q7r8s9t0u1v2_add_flow_tables.py new file mode 100644 index 000000000..22d62cbfb --- /dev/null +++ b/backend/alembic/versions/q7r8s9t0u1v2_add_flow_tables.py @@ -0,0 +1,150 @@ +# SPDX-FileCopyrightText: 2025 Weibo, Inc. +# +# SPDX-License-Identifier: Apache-2.0 + +""" +Add flows and flow_executions tables for AI Flow module. + +Revision ID: q7r8s9t0u1v2 +Revises: p6q7r8s9t0u1 +Create Date: 2025-01-10 10:00:00.000000 + +This migration creates the flows and flow_executions tables for the AI Flow +(智能流) module, which enables automated task execution with various trigger types. +""" + +import sqlalchemy as sa +from sqlalchemy.dialects import mysql + +from alembic import op + +# revision identifiers, used by Alembic. +revision = "q7r8s9t0u1v2" +down_revision = "p6q7r8s9t0u1" +branch_labels = None +depends_on = None + + +def upgrade(): + """Create flows and flow_executions tables.""" + # 1. Create flows table + op.create_table( + "flows", + sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), + sa.Column("user_id", sa.Integer(), nullable=False), + sa.Column("kind", sa.String(50), nullable=False, server_default="Flow"), + sa.Column("name", sa.String(255), nullable=False), + sa.Column("namespace", sa.String(255), nullable=False, server_default="default"), + sa.Column("json", mysql.JSON(), nullable=False), + sa.Column("is_active", sa.Boolean(), nullable=False, server_default="1"), + sa.Column("enabled", sa.Boolean(), nullable=False, server_default="1"), + sa.Column("trigger_type", sa.String(50), nullable=False), + sa.Column("team_id", sa.Integer(), nullable=True), + sa.Column("workspace_id", sa.Integer(), nullable=True), + sa.Column("webhook_token", sa.String(255), nullable=True), + sa.Column("last_execution_time", sa.DateTime(), nullable=True), + sa.Column("last_execution_status", sa.String(50), nullable=True), + sa.Column("next_execution_time", sa.DateTime(), nullable=True), + sa.Column("execution_count", sa.Integer(), nullable=False, server_default="0"), + sa.Column("success_count", sa.Integer(), nullable=False, server_default="0"), + sa.Column("failure_count", sa.Integer(), nullable=False, server_default="0"), + sa.Column( + "created_at", + sa.DateTime(), + nullable=False, + server_default=sa.func.now(), + ), + sa.Column( + "updated_at", + sa.DateTime(), + nullable=False, + server_default=sa.func.now(), + onupdate=sa.func.now(), + ), + sa.PrimaryKeyConstraint("id"), + ) + + # Create indexes for flows table + op.create_index("ix_flows_user_id", "flows", ["user_id"]) + op.create_index("ix_flows_enabled", "flows", ["enabled"]) + op.create_index("ix_flows_trigger_type", "flows", ["trigger_type"]) + op.create_index("ix_flows_team_id", "flows", ["team_id"]) + op.create_index("ix_flows_next_execution_time", "flows", ["next_execution_time"]) + op.create_index( + "ix_flows_user_kind_name_ns", + "flows", + ["user_id", "kind", "name", "namespace"], + unique=True, + ) + op.create_index("ix_flows_enabled_next_exec", "flows", ["enabled", "next_execution_time"]) + op.create_index("ix_flows_user_active", "flows", ["user_id", "is_active"]) + op.create_index("ix_flows_webhook_token", "flows", ["webhook_token"], unique=True) + + # 2. Create flow_executions table + op.create_table( + "flow_executions", + sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), + sa.Column("user_id", sa.Integer(), nullable=False), + sa.Column("flow_id", sa.Integer(), nullable=False), + sa.Column("task_id", sa.Integer(), nullable=True), + sa.Column("trigger_type", sa.String(50), nullable=False), + sa.Column("trigger_reason", sa.String(500), nullable=True), + sa.Column("prompt", sa.Text(), nullable=False), + sa.Column("status", sa.String(50), nullable=False, server_default="PENDING"), + sa.Column("result_summary", sa.Text(), nullable=True), + sa.Column("error_message", sa.Text(), nullable=True), + sa.Column("retry_attempt", sa.Integer(), nullable=False, server_default="0"), + sa.Column("started_at", sa.DateTime(), nullable=True), + sa.Column("completed_at", sa.DateTime(), nullable=True), + sa.Column( + "created_at", + sa.DateTime(), + nullable=False, + server_default=sa.func.now(), + ), + sa.Column( + "updated_at", + sa.DateTime(), + nullable=False, + server_default=sa.func.now(), + onupdate=sa.func.now(), + ), + sa.PrimaryKeyConstraint("id"), + sa.ForeignKeyConstraint(["flow_id"], ["flows.id"], ondelete="CASCADE"), + ) + + # Create indexes for flow_executions table + op.create_index("ix_flow_exec_user_id", "flow_executions", ["user_id"]) + op.create_index("ix_flow_exec_flow_id", "flow_executions", ["flow_id"]) + op.create_index("ix_flow_exec_task_id", "flow_executions", ["task_id"]) + op.create_index("ix_flow_exec_status", "flow_executions", ["status"]) + op.create_index("ix_flow_exec_created_at", "flow_executions", ["created_at"]) + op.create_index("ix_flow_exec_user_created", "flow_executions", ["user_id", "created_at"]) + op.create_index("ix_flow_exec_flow_created", "flow_executions", ["flow_id", "created_at"]) + op.create_index("ix_flow_exec_user_status", "flow_executions", ["user_id", "status"]) + + +def downgrade(): + """Drop flows and flow_executions tables.""" + # Drop flow_executions table and its indexes + op.drop_index("ix_flow_exec_user_status", table_name="flow_executions") + op.drop_index("ix_flow_exec_flow_created", table_name="flow_executions") + op.drop_index("ix_flow_exec_user_created", table_name="flow_executions") + op.drop_index("ix_flow_exec_created_at", table_name="flow_executions") + op.drop_index("ix_flow_exec_status", table_name="flow_executions") + op.drop_index("ix_flow_exec_task_id", table_name="flow_executions") + op.drop_index("ix_flow_exec_flow_id", table_name="flow_executions") + op.drop_index("ix_flow_exec_user_id", table_name="flow_executions") + op.drop_table("flow_executions") + + # Drop flows table and its indexes + op.drop_index("ix_flows_webhook_token", table_name="flows") + op.drop_index("ix_flows_user_active", table_name="flows") + op.drop_index("ix_flows_enabled_next_exec", table_name="flows") + op.drop_index("ix_flows_user_kind_name_ns", table_name="flows") + op.drop_index("ix_flows_next_execution_time", table_name="flows") + op.drop_index("ix_flows_team_id", table_name="flows") + op.drop_index("ix_flows_trigger_type", table_name="flows") + op.drop_index("ix_flows_enabled", table_name="flows") + op.drop_index("ix_flows_user_id", table_name="flows") + op.drop_table("flows") diff --git a/backend/app/api/api.py b/backend/app/api/api.py index d4c6956ee..c1575b346 100644 --- a/backend/app/api/api.py +++ b/backend/app/api/api.py @@ -28,6 +28,7 @@ chat, dify, executors, + flows, models, retrievers, shells, @@ -59,6 +60,7 @@ api_router.include_router(shells.router, prefix="/shells", tags=["shells"]) api_router.include_router(agents.router, prefix="/agents", tags=["public-shell"]) api_router.include_router(teams.router, prefix="/teams", tags=["teams"]) +api_router.include_router(flows.router, prefix="/flows", tags=["flows"]) api_router.include_router(tasks.router, prefix="/tasks", tags=["tasks"]) api_router.include_router(subtasks.router, prefix="/subtasks", tags=["subtasks"]) api_router.include_router(task_members.router, prefix="/tasks", tags=["task-members"]) diff --git a/backend/app/api/endpoints/adapter/flows.py b/backend/app/api/endpoints/adapter/flows.py new file mode 100644 index 000000000..4b7b0adf1 --- /dev/null +++ b/backend/app/api/endpoints/adapter/flows.py @@ -0,0 +1,257 @@ +# SPDX-FileCopyrightText: 2025 Weibo, Inc. +# +# SPDX-License-Identifier: Apache-2.0 + +""" +API endpoints for AI Flow (智能流) module. +""" +import logging +from datetime import datetime +from typing import Any, Dict, List, Optional + +from fastapi import APIRouter, Depends, HTTPException, Query, status +from sqlalchemy.orm import Session + +from app.api.dependencies import get_db +from app.core import security +from app.models.user import User +from app.schemas.flow import ( + FlowCreate, + FlowExecutionInDB, + FlowExecutionListResponse, + FlowExecutionStatus, + FlowInDB, + FlowListResponse, + FlowTriggerType, + FlowUpdate, +) +from app.services.flow import flow_service + +logger = logging.getLogger(__name__) + +router = APIRouter() + + +@router.get("", response_model=FlowListResponse) +def list_flows( + page: int = Query(1, ge=1, description="Page number"), + limit: int = Query(20, ge=1, le=100, description="Items per page"), + enabled: Optional[bool] = Query(None, description="Filter by enabled status"), + trigger_type: Optional[FlowTriggerType] = Query( + None, description="Filter by trigger type" + ), + db: Session = Depends(get_db), + current_user: User = Depends(security.get_current_user), +): + """ + List current user's Flow configurations. + + Returns paginated list of Flows with support for filtering by enabled status + and trigger type. + """ + skip = (page - 1) * limit + + items, total = flow_service.list_flows( + db=db, + user_id=current_user.id, + skip=skip, + limit=limit, + enabled=enabled, + trigger_type=trigger_type, + ) + + return FlowListResponse(total=total, items=items) + + +@router.post("", response_model=FlowInDB, status_code=status.HTTP_201_CREATED) +def create_flow( + flow_in: FlowCreate, + db: Session = Depends(get_db), + current_user: User = Depends(security.get_current_user), +): + """ + Create a new Flow configuration. + + The Flow will be created with the specified trigger configuration and + associated with the given Team (Agent). + """ + return flow_service.create_flow( + db=db, + flow_in=flow_in, + user_id=current_user.id, + ) + + +@router.get("/{flow_id}", response_model=FlowInDB) +def get_flow( + flow_id: int, + db: Session = Depends(get_db), + current_user: User = Depends(security.get_current_user), +): + """ + Get a specific Flow configuration by ID. + """ + return flow_service.get_flow( + db=db, + flow_id=flow_id, + user_id=current_user.id, + ) + + +@router.put("/{flow_id}", response_model=FlowInDB) +def update_flow( + flow_id: int, + flow_in: FlowUpdate, + db: Session = Depends(get_db), + current_user: User = Depends(security.get_current_user), +): + """ + Update an existing Flow configuration. + + Any fields not provided will retain their current values. + """ + return flow_service.update_flow( + db=db, + flow_id=flow_id, + flow_in=flow_in, + user_id=current_user.id, + ) + + +@router.delete("/{flow_id}", status_code=status.HTTP_204_NO_CONTENT) +def delete_flow( + flow_id: int, + db: Session = Depends(get_db), + current_user: User = Depends(security.get_current_user), +): + """ + Delete a Flow configuration (soft delete). + + The Flow will be marked as inactive and disabled. + """ + flow_service.delete_flow( + db=db, + flow_id=flow_id, + user_id=current_user.id, + ) + + +@router.post("/{flow_id}/toggle", response_model=FlowInDB) +def toggle_flow( + flow_id: int, + enabled: bool = Query(..., description="Enable or disable the flow"), + db: Session = Depends(get_db), + current_user: User = Depends(security.get_current_user), +): + """ + Enable or disable a Flow. + + When enabled, scheduled flows will resume executing according to their + trigger configuration. When disabled, no new executions will be triggered. + """ + return flow_service.toggle_flow( + db=db, + flow_id=flow_id, + user_id=current_user.id, + enabled=enabled, + ) + + +@router.post("/{flow_id}/trigger", response_model=FlowExecutionInDB) +def trigger_flow( + flow_id: int, + db: Session = Depends(get_db), + current_user: User = Depends(security.get_current_user), +): + """ + Manually trigger a Flow execution. + + Creates a new execution record and queues the task for immediate execution. + This is useful for testing flows or running them outside their normal schedule. + """ + return flow_service.trigger_flow_manually( + db=db, + flow_id=flow_id, + user_id=current_user.id, + ) + + +# ========== Webhook Trigger Endpoint ========== + + +@router.post("/webhook/{webhook_token}", response_model=FlowExecutionInDB) +def trigger_flow_webhook( + webhook_token: str, + payload: Dict[str, Any] = {}, + db: Session = Depends(get_db), +): + """ + Trigger a Flow via webhook. + + This endpoint is called by external systems to trigger event-based flows. + The payload will be available as {{webhook_data}} in the prompt template. + + No authentication required - the webhook_token acts as the secret. + """ + return flow_service.trigger_flow_by_webhook( + db=db, + webhook_token=webhook_token, + payload=payload, + ) + + +# ========== Execution History Endpoints (Timeline) ========== + + +@router.get("/executions", response_model=FlowExecutionListResponse) +def list_executions( + page: int = Query(1, ge=1, description="Page number"), + limit: int = Query(50, ge=1, le=100, description="Items per page"), + flow_id: Optional[int] = Query(None, description="Filter by flow ID"), + status: Optional[List[FlowExecutionStatus]] = Query( + None, description="Filter by execution status" + ), + start_date: Optional[datetime] = Query(None, description="Filter by start date"), + end_date: Optional[datetime] = Query(None, description="Filter by end date"), + db: Session = Depends(get_db), + current_user: User = Depends(security.get_current_user), +): + """ + List Flow execution history (timeline view). + + Returns paginated list of execution records sorted by creation time (newest first). + Supports filtering by flow, status, and date range. + """ + skip = (page - 1) * limit + + items, total = flow_service.list_executions( + db=db, + user_id=current_user.id, + skip=skip, + limit=limit, + flow_id=flow_id, + status=status, + start_date=start_date, + end_date=end_date, + ) + + return FlowExecutionListResponse(total=total, items=items) + + +@router.get("/executions/{execution_id}", response_model=FlowExecutionInDB) +def get_execution( + execution_id: int, + db: Session = Depends(get_db), + current_user: User = Depends(security.get_current_user), +): + """ + Get a specific Flow execution by ID. + + Returns detailed information about the execution including the resolved prompt + and any result/error messages. + """ + return flow_service.get_execution( + db=db, + execution_id=execution_id, + user_id=current_user.id, + ) diff --git a/backend/app/models/flow.py b/backend/app/models/flow.py new file mode 100644 index 000000000..09a8a9bf2 --- /dev/null +++ b/backend/app/models/flow.py @@ -0,0 +1,138 @@ +# SPDX-FileCopyrightText: 2025 Weibo, Inc. +# +# SPDX-License-Identifier: Apache-2.0 + +""" +Database model for AI Flow (智能流). +""" +from datetime import datetime + +from sqlalchemy import ( + JSON, + Boolean, + Column, + DateTime, + ForeignKey, + Index, + Integer, + String, + Text, +) + +from app.db.base_class import Base + + +class FlowResource(Base): + """ + Flow resource table for storing AI Flow configurations. + + Similar to the Kind table pattern but optimized for Flow resources + with additional scheduling-specific fields. + """ + + __tablename__ = "flows" + + id = Column(Integer, primary_key=True, autoincrement=True) + user_id = Column(Integer, nullable=False, index=True) + + # Resource identification (CRD-style) + kind = Column(String(50), default="Flow", nullable=False) + name = Column(String(255), nullable=False) + namespace = Column(String(255), default="default", nullable=False) + + # Full CRD JSON storage + json = Column(JSON, nullable=False) + + # Status flags + is_active = Column(Boolean, default=True, nullable=False) + + # Scheduling fields (denormalized for efficient queries) + enabled = Column(Boolean, default=True, nullable=False, index=True) + trigger_type = Column(String(50), nullable=False, index=True) + team_id = Column(Integer, nullable=True, index=True) + workspace_id = Column(Integer, nullable=True) + + # Webhook support + webhook_token = Column(String(255), nullable=True, unique=True) + + # Execution statistics + last_execution_time = Column(DateTime, nullable=True) + last_execution_status = Column(String(50), nullable=True) + next_execution_time = Column(DateTime, nullable=True, index=True) + execution_count = Column(Integer, default=0, nullable=False) + success_count = Column(Integer, default=0, nullable=False) + failure_count = Column(Integer, default=0, nullable=False) + + # Timestamps + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column( + DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False + ) + + __table_args__ = ( + # Unique constraint: (user_id, kind, name, namespace) for CRD-style uniqueness + Index( + "ix_flows_user_kind_name_ns", "user_id", "kind", "name", "namespace", unique=True + ), + # Index for scheduler queries: find enabled flows that need execution + Index("ix_flows_enabled_next_exec", "enabled", "next_execution_time"), + # Index for user's flows listing + Index("ix_flows_user_active", "user_id", "is_active"), + ) + + +class FlowExecution(Base): + """ + Flow execution records table. + + Stores each execution instance of a Flow, linking to the actual Task + that was created for the execution. + """ + + __tablename__ = "flow_executions" + + id = Column(Integer, primary_key=True, autoincrement=True) + user_id = Column(Integer, nullable=False, index=True) + + # Flow reference + flow_id = Column( + Integer, + ForeignKey("flows.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + + # Task reference (the actual task created for this execution) + task_id = Column(Integer, nullable=True, index=True) + + # Trigger information + trigger_type = Column(String(50), nullable=False) # cron, interval, webhook, etc. + trigger_reason = Column(String(500), nullable=True) # Human-readable reason + + # Resolved prompt (with variables substituted) + prompt = Column(Text, nullable=False) + + # Execution status + status = Column(String(50), default="PENDING", nullable=False, index=True) + result_summary = Column(Text, nullable=True) + error_message = Column(Text, nullable=True) + + # Retry tracking + retry_attempt = Column(Integer, default=0, nullable=False) + + # Timing + started_at = Column(DateTime, nullable=True) + completed_at = Column(DateTime, nullable=True) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False, index=True) + updated_at = Column( + DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False + ) + + __table_args__ = ( + # Index for timeline queries (recent executions) + Index("ix_flow_exec_user_created", "user_id", "created_at"), + # Index for flow execution history + Index("ix_flow_exec_flow_created", "flow_id", "created_at"), + # Index for status filtering + Index("ix_flow_exec_user_status", "user_id", "status"), + ) diff --git a/backend/app/schemas/flow.py b/backend/app/schemas/flow.py new file mode 100644 index 000000000..d6ff8e34c --- /dev/null +++ b/backend/app/schemas/flow.py @@ -0,0 +1,322 @@ +# SPDX-FileCopyrightText: 2025 Weibo, Inc. +# +# SPDX-License-Identifier: Apache-2.0 + +""" +AI Flow (智能流) CRD schemas for automated task execution. +""" +from datetime import datetime +from enum import Enum +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field + + +class FlowTaskType(str, Enum): + """Flow task type enumeration.""" + + EXECUTION = "execution" # 执行类任务 + COLLECTION = "collection" # 信息采集类任务 + + +class FlowTriggerType(str, Enum): + """Flow trigger type enumeration.""" + + CRON = "cron" # 定时计划 + INTERVAL = "interval" # 固定间隔 + ONE_TIME = "one_time" # 一次性定时 + EVENT = "event" # 事件触发 + + +class FlowEventType(str, Enum): + """Event trigger sub-type enumeration.""" + + WEBHOOK = "webhook" # Webhook 触发 + GIT_PUSH = "git_push" # Git Push 触发 + + +class FlowExecutionStatus(str, Enum): + """Flow execution status enumeration.""" + + PENDING = "PENDING" + RUNNING = "RUNNING" + COMPLETED = "COMPLETED" + FAILED = "FAILED" + RETRYING = "RETRYING" + CANCELLED = "CANCELLED" + + +# Trigger configuration schemas +class CronTriggerConfig(BaseModel): + """Cron trigger configuration.""" + + expression: str = Field(..., description="Cron expression (e.g., '0 9 * * *')") + timezone: str = Field("UTC", description="Timezone for cron execution") + + +class IntervalTriggerConfig(BaseModel): + """Interval trigger configuration.""" + + value: int = Field(..., description="Interval value") + unit: str = Field( + ..., description="Interval unit: 'minutes', 'hours', 'days'" + ) + + +class OneTimeTriggerConfig(BaseModel): + """One-time trigger configuration.""" + + execute_at: datetime = Field(..., description="Specific execution time (ISO format)") + + +class GitPushEventConfig(BaseModel): + """Git push event configuration.""" + + repository: str = Field(..., description="Repository URL or name") + branch: Optional[str] = Field(None, description="Branch to monitor (default: all)") + + +class EventTriggerConfig(BaseModel): + """Event trigger configuration.""" + + event_type: FlowEventType = Field(..., description="Event type: 'webhook' or 'git_push'") + git_push: Optional[GitPushEventConfig] = Field( + None, description="Git push configuration (when event_type is 'git_push')" + ) + + +class FlowTriggerConfig(BaseModel): + """Flow trigger configuration.""" + + type: FlowTriggerType = Field(..., description="Trigger type") + cron: Optional[CronTriggerConfig] = Field( + None, description="Cron configuration (when type is 'cron')" + ) + interval: Optional[IntervalTriggerConfig] = Field( + None, description="Interval configuration (when type is 'interval')" + ) + one_time: Optional[OneTimeTriggerConfig] = Field( + None, description="One-time configuration (when type is 'one_time')" + ) + event: Optional[EventTriggerConfig] = Field( + None, description="Event configuration (when type is 'event')" + ) + + +# Reference schemas +class FlowTeamRef(BaseModel): + """Reference to a Team (Agent).""" + + name: str + namespace: str = "default" + + +class FlowWorkspaceRef(BaseModel): + """Reference to a Workspace (optional).""" + + name: str + namespace: str = "default" + + +# CRD spec and status +class FlowSpec(BaseModel): + """Flow CRD specification.""" + + displayName: str = Field(..., description="User-friendly display name") + taskType: FlowTaskType = Field( + FlowTaskType.COLLECTION, description="Task type: 'execution' or 'collection'" + ) + trigger: FlowTriggerConfig = Field(..., description="Trigger configuration") + teamRef: FlowTeamRef = Field(..., description="Reference to the Team (Agent)") + workspaceRef: Optional[FlowWorkspaceRef] = Field( + None, description="Reference to the Workspace (optional)" + ) + promptTemplate: str = Field( + ..., description="Prompt template with variable support ({{date}}, {{time}}, etc.)" + ) + retryCount: int = Field(0, ge=0, le=3, description="Retry count on failure (0-3)") + enabled: bool = Field(True, description="Whether the flow is enabled") + description: Optional[str] = Field(None, description="Flow description") + + +class FlowStatus(BaseModel): + """Flow CRD status.""" + + state: str = Field("Available", description="Flow state: 'Available', 'Unavailable'") + lastExecutionTime: Optional[datetime] = Field( + None, description="Last execution timestamp" + ) + lastExecutionStatus: Optional[FlowExecutionStatus] = Field( + None, description="Last execution status" + ) + nextExecutionTime: Optional[datetime] = Field( + None, description="Next scheduled execution time" + ) + webhookUrl: Optional[str] = Field( + None, description="Webhook URL (for event-webhook flows)" + ) + executionCount: int = Field(0, description="Total execution count") + successCount: int = Field(0, description="Successful execution count") + failureCount: int = Field(0, description="Failed execution count") + + +class FlowMetadata(BaseModel): + """Flow CRD metadata.""" + + name: str + namespace: str = "default" + displayName: Optional[str] = None + labels: Optional[Dict[str, str]] = None + + +class Flow(BaseModel): + """Flow CRD.""" + + apiVersion: str = "agent.wecode.io/v1" + kind: str = "Flow" + metadata: FlowMetadata + spec: FlowSpec + status: Optional[FlowStatus] = None + + +class FlowList(BaseModel): + """Flow list.""" + + apiVersion: str = "agent.wecode.io/v1" + kind: str = "FlowList" + items: List[Flow] + + +# API Request/Response schemas +class FlowBase(BaseModel): + """Base Flow model for API.""" + + name: str = Field(..., description="Flow unique identifier") + display_name: str = Field(..., description="Display name") + description: Optional[str] = Field(None, description="Flow description") + task_type: FlowTaskType = Field( + FlowTaskType.COLLECTION, description="Task type" + ) + trigger_type: FlowTriggerType = Field(..., description="Trigger type") + trigger_config: Dict[str, Any] = Field(..., description="Trigger configuration") + team_id: int = Field(..., description="Team (Agent) ID") + workspace_id: Optional[int] = Field(None, description="Workspace ID (optional)") + prompt_template: str = Field(..., description="Prompt template") + retry_count: int = Field(0, ge=0, le=3, description="Retry count (0-3)") + enabled: bool = Field(True, description="Whether enabled") + + +class FlowCreate(FlowBase): + """Flow creation model.""" + + namespace: str = Field("default", description="Namespace") + + +class FlowUpdate(BaseModel): + """Flow update model.""" + + display_name: Optional[str] = None + description: Optional[str] = None + task_type: Optional[FlowTaskType] = None + trigger_type: Optional[FlowTriggerType] = None + trigger_config: Optional[Dict[str, Any]] = None + team_id: Optional[int] = None + workspace_id: Optional[int] = None + prompt_template: Optional[str] = None + retry_count: Optional[int] = Field(None, ge=0, le=3) + enabled: Optional[bool] = None + + +class FlowInDB(FlowBase): + """Database Flow model.""" + + id: int + user_id: int + namespace: str = "default" + webhook_url: Optional[str] = None + last_execution_time: Optional[datetime] = None + last_execution_status: Optional[str] = None + next_execution_time: Optional[datetime] = None + execution_count: int = 0 + success_count: int = 0 + failure_count: int = 0 + created_at: datetime + updated_at: datetime + + class Config: + from_attributes = True + + +class FlowListResponse(BaseModel): + """Flow list response.""" + + total: int + items: List[FlowInDB] + + +# Flow Execution schemas +class FlowExecutionBase(BaseModel): + """Base Flow Execution model.""" + + flow_id: int + trigger_type: str = Field(..., description="What triggered this execution") + trigger_reason: str = Field(..., description="Human-readable trigger reason") + prompt: str = Field(..., description="Resolved prompt (with variables substituted)") + + +class FlowExecutionCreate(FlowExecutionBase): + """Flow Execution creation model.""" + + task_id: Optional[int] = Field(None, description="Associated Task ID") + + +class FlowExecutionInDB(FlowExecutionBase): + """Database Flow Execution model.""" + + id: int + user_id: int + task_id: Optional[int] = None + status: FlowExecutionStatus = FlowExecutionStatus.PENDING + result_summary: Optional[str] = None + error_message: Optional[str] = None + retry_attempt: int = 0 + started_at: Optional[datetime] = None + completed_at: Optional[datetime] = None + created_at: datetime + updated_at: datetime + # Joined fields for display + flow_name: Optional[str] = None + flow_display_name: Optional[str] = None + team_name: Optional[str] = None + task_type: Optional[str] = None + + class Config: + from_attributes = True + + +class FlowExecutionDetail(FlowExecutionInDB): + """Detailed Flow Execution with task info.""" + + task_detail: Optional[Dict[str, Any]] = None + + +class FlowExecutionListResponse(BaseModel): + """Flow Execution list response (timeline).""" + + total: int + items: List[FlowExecutionInDB] + + +# Timeline filter schemas +class FlowTimelineFilter(BaseModel): + """Filter options for flow timeline.""" + + time_range: Optional[str] = Field( + "7d", description="Time range: 'today', '7d', '30d', 'custom'" + ) + start_date: Optional[datetime] = None + end_date: Optional[datetime] = None + status: Optional[List[FlowExecutionStatus]] = None + flow_ids: Optional[List[int]] = None + team_ids: Optional[List[int]] = None + task_types: Optional[List[FlowTaskType]] = None diff --git a/backend/app/services/flow.py b/backend/app/services/flow.py new file mode 100644 index 000000000..2c9e3ce42 --- /dev/null +++ b/backend/app/services/flow.py @@ -0,0 +1,964 @@ +# SPDX-FileCopyrightText: 2025 Weibo, Inc. +# +# SPDX-License-Identifier: Apache-2.0 + +""" +AI Flow service layer for managing Flow configurations and executions. +""" +import logging +import re +import secrets +import uuid +from datetime import datetime, timedelta +from typing import Any, Dict, List, Optional + +from fastapi import HTTPException +from sqlalchemy import and_, desc, func, or_ +from sqlalchemy.orm import Session +from sqlalchemy.orm.attributes import flag_modified + +from app.models.flow import FlowExecution, FlowResource +from app.models.kind import Kind +from app.models.task import TaskResource +from app.schemas.flow import ( + Flow, + FlowCreate, + FlowExecutionCreate, + FlowExecutionInDB, + FlowExecutionStatus, + FlowInDB, + FlowMetadata, + FlowSpec, + FlowStatus, + FlowTaskType, + FlowTriggerConfig, + FlowTriggerType, + FlowUpdate, +) + +logger = logging.getLogger(__name__) + + +class FlowService: + """Service class for AI Flow operations.""" + + # Supported prompt template variables + TEMPLATE_VARIABLES = { + "date": lambda: datetime.utcnow().strftime("%Y-%m-%d"), + "time": lambda: datetime.utcnow().strftime("%H:%M:%S"), + "datetime": lambda: datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), + "timestamp": lambda: str(int(datetime.utcnow().timestamp())), + } + + def create_flow( + self, + db: Session, + *, + flow_in: FlowCreate, + user_id: int, + ) -> FlowInDB: + """Create a new Flow configuration.""" + # Validate flow name uniqueness + existing = ( + db.query(FlowResource) + .filter( + FlowResource.user_id == user_id, + FlowResource.kind == "Flow", + FlowResource.name == flow_in.name, + FlowResource.namespace == flow_in.namespace, + FlowResource.is_active == True, + ) + .first() + ) + + if existing: + raise HTTPException( + status_code=400, + detail=f"Flow with name '{flow_in.name}' already exists", + ) + + # Validate team exists + team = ( + db.query(Kind) + .filter( + Kind.id == flow_in.team_id, + Kind.kind == "Team", + Kind.is_active == True, + ) + .first() + ) + + if not team: + raise HTTPException( + status_code=400, + detail=f"Team with id {flow_in.team_id} not found", + ) + + # Validate workspace if provided + workspace = None + if flow_in.workspace_id: + workspace = ( + db.query(TaskResource) + .filter( + TaskResource.id == flow_in.workspace_id, + TaskResource.kind == "Workspace", + TaskResource.is_active == True, + ) + .first() + ) + + if not workspace: + raise HTTPException( + status_code=400, + detail=f"Workspace with id {flow_in.workspace_id} not found", + ) + + # Generate webhook token for event-type flows + webhook_token = None + if flow_in.trigger_type == FlowTriggerType.EVENT: + webhook_token = secrets.token_urlsafe(32) + + # Build CRD JSON + flow_crd = self._build_flow_crd(flow_in, team, workspace, webhook_token) + + # Calculate next execution time for scheduled flows + next_execution_time = self._calculate_next_execution_time( + flow_in.trigger_type, flow_in.trigger_config + ) + + # Create Flow resource + flow = FlowResource( + user_id=user_id, + kind="Flow", + name=flow_in.name, + namespace=flow_in.namespace, + json=flow_crd.model_dump(mode="json"), + is_active=True, + enabled=flow_in.enabled, + trigger_type=flow_in.trigger_type.value, + team_id=flow_in.team_id, + workspace_id=flow_in.workspace_id, + webhook_token=webhook_token, + next_execution_time=next_execution_time, + ) + + db.add(flow) + db.commit() + db.refresh(flow) + + return self._convert_to_flow_in_db(flow) + + def get_flow( + self, + db: Session, + *, + flow_id: int, + user_id: int, + ) -> FlowInDB: + """Get a Flow by ID.""" + flow = ( + db.query(FlowResource) + .filter( + FlowResource.id == flow_id, + FlowResource.user_id == user_id, + FlowResource.is_active == True, + ) + .first() + ) + + if not flow: + raise HTTPException(status_code=404, detail="Flow not found") + + return self._convert_to_flow_in_db(flow) + + def list_flows( + self, + db: Session, + *, + user_id: int, + skip: int = 0, + limit: int = 100, + enabled: Optional[bool] = None, + trigger_type: Optional[FlowTriggerType] = None, + ) -> tuple[List[FlowInDB], int]: + """List user's Flows with pagination.""" + query = db.query(FlowResource).filter( + FlowResource.user_id == user_id, + FlowResource.is_active == True, + ) + + if enabled is not None: + query = query.filter(FlowResource.enabled == enabled) + + if trigger_type is not None: + query = query.filter(FlowResource.trigger_type == trigger_type.value) + + total = query.count() + flows = ( + query.order_by(desc(FlowResource.updated_at)) + .offset(skip) + .limit(limit) + .all() + ) + + return [self._convert_to_flow_in_db(f) for f in flows], total + + def update_flow( + self, + db: Session, + *, + flow_id: int, + flow_in: FlowUpdate, + user_id: int, + ) -> FlowInDB: + """Update a Flow configuration.""" + flow = ( + db.query(FlowResource) + .filter( + FlowResource.id == flow_id, + FlowResource.user_id == user_id, + FlowResource.is_active == True, + ) + .first() + ) + + if not flow: + raise HTTPException(status_code=404, detail="Flow not found") + + flow_crd = Flow.model_validate(flow.json) + update_data = flow_in.model_dump(exclude_unset=True) + + # Update team reference if changed + if "team_id" in update_data: + team = ( + db.query(Kind) + .filter( + Kind.id == update_data["team_id"], + Kind.kind == "Team", + Kind.is_active == True, + ) + .first() + ) + if not team: + raise HTTPException( + status_code=400, + detail=f"Team with id {update_data['team_id']} not found", + ) + flow.team_id = update_data["team_id"] + flow_crd.spec.teamRef.name = team.name + flow_crd.spec.teamRef.namespace = team.namespace + + # Update workspace reference if changed + if "workspace_id" in update_data: + if update_data["workspace_id"]: + workspace = ( + db.query(TaskResource) + .filter( + TaskResource.id == update_data["workspace_id"], + TaskResource.kind == "Workspace", + TaskResource.is_active == True, + ) + .first() + ) + if not workspace: + raise HTTPException( + status_code=400, + detail=f"Workspace with id {update_data['workspace_id']} not found", + ) + from app.schemas.flow import FlowWorkspaceRef + + flow_crd.spec.workspaceRef = FlowWorkspaceRef( + name=workspace.name, namespace=workspace.namespace + ) + else: + flow_crd.spec.workspaceRef = None + flow.workspace_id = update_data["workspace_id"] + + # Update other fields + if "display_name" in update_data: + flow_crd.spec.displayName = update_data["display_name"] + + if "description" in update_data: + flow_crd.spec.description = update_data["description"] + + if "task_type" in update_data: + flow_crd.spec.taskType = update_data["task_type"] + + if "prompt_template" in update_data: + flow_crd.spec.promptTemplate = update_data["prompt_template"] + + if "retry_count" in update_data: + flow_crd.spec.retryCount = update_data["retry_count"] + + if "enabled" in update_data: + flow_crd.spec.enabled = update_data["enabled"] + flow.enabled = update_data["enabled"] + + # Update trigger configuration + if "trigger_type" in update_data or "trigger_config" in update_data: + trigger_type = update_data.get("trigger_type", flow.trigger_type) + trigger_config = update_data.get( + "trigger_config", + self._extract_trigger_config(flow_crd.spec.trigger), + ) + + # Generate new webhook token if switching to event trigger + if ( + trigger_type == FlowTriggerType.EVENT + and flow.trigger_type != FlowTriggerType.EVENT.value + ): + flow.webhook_token = secrets.token_urlsafe(32) + elif trigger_type != FlowTriggerType.EVENT: + flow.webhook_token = None + + flow_crd.spec.trigger = self._build_trigger_config( + trigger_type, trigger_config + ) + flow.trigger_type = ( + trigger_type.value + if isinstance(trigger_type, FlowTriggerType) + else trigger_type + ) + + # Recalculate next execution time + flow.next_execution_time = self._calculate_next_execution_time( + trigger_type, trigger_config + ) + + # Update status with webhook URL + if flow.webhook_token: + if flow_crd.status is None: + flow_crd.status = FlowStatus() + flow_crd.status.webhookUrl = f"/api/flows/webhook/{flow.webhook_token}" + + # Save changes + flow.json = flow_crd.model_dump(mode="json") + flow.updated_at = datetime.utcnow() + flag_modified(flow, "json") + + db.commit() + db.refresh(flow) + + return self._convert_to_flow_in_db(flow) + + def delete_flow( + self, + db: Session, + *, + flow_id: int, + user_id: int, + ) -> None: + """Delete a Flow (soft delete).""" + flow = ( + db.query(FlowResource) + .filter( + FlowResource.id == flow_id, + FlowResource.user_id == user_id, + FlowResource.is_active == True, + ) + .first() + ) + + if not flow: + raise HTTPException(status_code=404, detail="Flow not found") + + # Soft delete + flow.is_active = False + flow.enabled = False + flow.updated_at = datetime.utcnow() + + db.commit() + + def toggle_flow( + self, + db: Session, + *, + flow_id: int, + user_id: int, + enabled: bool, + ) -> FlowInDB: + """Enable or disable a Flow.""" + flow = ( + db.query(FlowResource) + .filter( + FlowResource.id == flow_id, + FlowResource.user_id == user_id, + FlowResource.is_active == True, + ) + .first() + ) + + if not flow: + raise HTTPException(status_code=404, detail="Flow not found") + + flow.enabled = enabled + flow_crd = Flow.model_validate(flow.json) + flow_crd.spec.enabled = enabled + + # Recalculate next execution time if enabling + if enabled: + flow.next_execution_time = self._calculate_next_execution_time( + flow.trigger_type, + self._extract_trigger_config(flow_crd.spec.trigger), + ) + else: + flow.next_execution_time = None + + flow.json = flow_crd.model_dump(mode="json") + flow.updated_at = datetime.utcnow() + flag_modified(flow, "json") + + db.commit() + db.refresh(flow) + + return self._convert_to_flow_in_db(flow) + + def trigger_flow_manually( + self, + db: Session, + *, + flow_id: int, + user_id: int, + ) -> FlowExecutionInDB: + """Manually trigger a Flow execution.""" + flow = ( + db.query(FlowResource) + .filter( + FlowResource.id == flow_id, + FlowResource.user_id == user_id, + FlowResource.is_active == True, + ) + .first() + ) + + if not flow: + raise HTTPException(status_code=404, detail="Flow not found") + + # Create execution record + return self._create_execution( + db, + flow=flow, + user_id=user_id, + trigger_type="manual", + trigger_reason="Manually triggered by user", + ) + + def get_flow_by_webhook_token( + self, + db: Session, + *, + webhook_token: str, + ) -> Optional[FlowResource]: + """Get a Flow by webhook token.""" + return ( + db.query(FlowResource) + .filter( + FlowResource.webhook_token == webhook_token, + FlowResource.is_active == True, + FlowResource.enabled == True, + ) + .first() + ) + + def trigger_flow_by_webhook( + self, + db: Session, + *, + webhook_token: str, + payload: Dict[str, Any], + ) -> FlowExecutionInDB: + """Trigger a Flow via webhook.""" + flow = self.get_flow_by_webhook_token(db, webhook_token=webhook_token) + + if not flow: + raise HTTPException(status_code=404, detail="Flow not found or disabled") + + # Create execution with webhook data + return self._create_execution( + db, + flow=flow, + user_id=flow.user_id, + trigger_type="webhook", + trigger_reason="Triggered by webhook", + extra_variables={"webhook_data": payload}, + ) + + # ========== Execution Management ========== + + def list_executions( + self, + db: Session, + *, + user_id: int, + skip: int = 0, + limit: int = 50, + flow_id: Optional[int] = None, + status: Optional[List[FlowExecutionStatus]] = None, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + ) -> tuple[List[FlowExecutionInDB], int]: + """List Flow executions (timeline view).""" + query = db.query(FlowExecution).filter(FlowExecution.user_id == user_id) + + if flow_id: + query = query.filter(FlowExecution.flow_id == flow_id) + + if status: + query = query.filter(FlowExecution.status.in_([s.value for s in status])) + + if start_date: + query = query.filter(FlowExecution.created_at >= start_date) + + if end_date: + query = query.filter(FlowExecution.created_at <= end_date) + + total = query.count() + executions = ( + query.order_by(desc(FlowExecution.created_at)) + .offset(skip) + .limit(limit) + .all() + ) + + # Enrich with flow details + result = [] + flow_cache = {} + for exec in executions: + exec_dict = self._convert_execution_to_dict(exec) + + # Get flow details (cached) + if exec.flow_id not in flow_cache: + flow = db.query(FlowResource).filter(FlowResource.id == exec.flow_id).first() + if flow: + flow_crd = Flow.model_validate(flow.json) + flow_cache[exec.flow_id] = { + "name": flow.name, + "display_name": flow_crd.spec.displayName, + "task_type": flow_crd.spec.taskType.value, + } + + flow_info = flow_cache.get(exec.flow_id, {}) + exec_dict["flow_name"] = flow_info.get("name") + exec_dict["flow_display_name"] = flow_info.get("display_name") + exec_dict["task_type"] = flow_info.get("task_type") + + # Get team name if available + if exec.flow_id in flow_cache: + flow = db.query(FlowResource).filter(FlowResource.id == exec.flow_id).first() + if flow and flow.team_id: + team = db.query(Kind).filter(Kind.id == flow.team_id).first() + if team: + exec_dict["team_name"] = team.name + + result.append(FlowExecutionInDB(**exec_dict)) + + return result, total + + def get_execution( + self, + db: Session, + *, + execution_id: int, + user_id: int, + ) -> FlowExecutionInDB: + """Get a specific Flow execution.""" + execution = ( + db.query(FlowExecution) + .filter( + FlowExecution.id == execution_id, + FlowExecution.user_id == user_id, + ) + .first() + ) + + if not execution: + raise HTTPException(status_code=404, detail="Execution not found") + + exec_dict = self._convert_execution_to_dict(execution) + + # Get flow details + flow = db.query(FlowResource).filter(FlowResource.id == execution.flow_id).first() + if flow: + flow_crd = Flow.model_validate(flow.json) + exec_dict["flow_name"] = flow.name + exec_dict["flow_display_name"] = flow_crd.spec.displayName + exec_dict["task_type"] = flow_crd.spec.taskType.value + + if flow.team_id: + team = db.query(Kind).filter(Kind.id == flow.team_id).first() + if team: + exec_dict["team_name"] = team.name + + return FlowExecutionInDB(**exec_dict) + + def update_execution_status( + self, + db: Session, + *, + execution_id: int, + status: FlowExecutionStatus, + result_summary: Optional[str] = None, + error_message: Optional[str] = None, + ) -> None: + """Update execution status (called by scheduler/task completion).""" + execution = db.query(FlowExecution).filter(FlowExecution.id == execution_id).first() + + if not execution: + return + + execution.status = status.value + execution.updated_at = datetime.utcnow() + + if status == FlowExecutionStatus.RUNNING: + execution.started_at = datetime.utcnow() + elif status in (FlowExecutionStatus.COMPLETED, FlowExecutionStatus.FAILED): + execution.completed_at = datetime.utcnow() + + if result_summary: + execution.result_summary = result_summary + + if error_message: + execution.error_message = error_message + + # Update flow statistics + flow = db.query(FlowResource).filter(FlowResource.id == execution.flow_id).first() + if flow: + flow.last_execution_time = datetime.utcnow() + flow.last_execution_status = status.value + flow.execution_count += 1 + + if status == FlowExecutionStatus.COMPLETED: + flow.success_count += 1 + elif status == FlowExecutionStatus.FAILED: + flow.failure_count += 1 + + # Update CRD status + flow_crd = Flow.model_validate(flow.json) + if flow_crd.status is None: + flow_crd.status = FlowStatus() + flow_crd.status.lastExecutionTime = datetime.utcnow() + flow_crd.status.lastExecutionStatus = status + flow_crd.status.executionCount = flow.execution_count + flow_crd.status.successCount = flow.success_count + flow_crd.status.failureCount = flow.failure_count + + flow.json = flow_crd.model_dump(mode="json") + flag_modified(flow, "json") + + db.commit() + + # ========== Helper Methods ========== + + def _create_execution( + self, + db: Session, + *, + flow: FlowResource, + user_id: int, + trigger_type: str, + trigger_reason: str, + extra_variables: Optional[Dict[str, Any]] = None, + ) -> FlowExecutionInDB: + """Create a new Flow execution record.""" + flow_crd = Flow.model_validate(flow.json) + + # Resolve prompt template + resolved_prompt = self._resolve_prompt_template( + flow_crd.spec.promptTemplate, + flow_crd.spec.displayName, + extra_variables, + ) + + execution = FlowExecution( + user_id=user_id, + flow_id=flow.id, + trigger_type=trigger_type, + trigger_reason=trigger_reason, + prompt=resolved_prompt, + status=FlowExecutionStatus.PENDING.value, + ) + + db.add(execution) + db.commit() + db.refresh(execution) + + exec_dict = self._convert_execution_to_dict(execution) + exec_dict["flow_name"] = flow.name + exec_dict["flow_display_name"] = flow_crd.spec.displayName + exec_dict["task_type"] = flow_crd.spec.taskType.value + + return FlowExecutionInDB(**exec_dict) + + def _resolve_prompt_template( + self, + template: str, + flow_name: str, + extra_variables: Optional[Dict[str, Any]] = None, + ) -> str: + """Resolve prompt template with variables.""" + result = template + + # Replace standard variables + for var_name, var_func in self.TEMPLATE_VARIABLES.items(): + pattern = "{{" + var_name + "}}" + if pattern in result: + result = result.replace(pattern, var_func()) + + # Replace flow_name + result = result.replace("{{flow_name}}", flow_name) + + # Replace extra variables (like webhook_data) + if extra_variables: + for var_name, var_value in extra_variables.items(): + pattern = "{{" + var_name + "}}" + if pattern in result: + if isinstance(var_value, (dict, list)): + import json + + result = result.replace(pattern, json.dumps(var_value, ensure_ascii=False)) + else: + result = result.replace(pattern, str(var_value)) + + return result + + def _build_flow_crd( + self, + flow_in: FlowCreate, + team: Kind, + workspace: Optional[TaskResource], + webhook_token: Optional[str], + ) -> Flow: + """Build Flow CRD JSON structure.""" + from app.schemas.flow import FlowTeamRef, FlowWorkspaceRef + + # Build trigger config + trigger = self._build_trigger_config( + flow_in.trigger_type, flow_in.trigger_config + ) + + spec = FlowSpec( + displayName=flow_in.display_name, + taskType=flow_in.task_type, + trigger=trigger, + teamRef=FlowTeamRef(name=team.name, namespace=team.namespace), + workspaceRef=( + FlowWorkspaceRef(name=workspace.name, namespace=workspace.namespace) + if workspace + else None + ), + promptTemplate=flow_in.prompt_template, + retryCount=flow_in.retry_count, + enabled=flow_in.enabled, + description=flow_in.description, + ) + + status = FlowStatus() + if webhook_token: + status.webhookUrl = f"/api/flows/webhook/{webhook_token}" + + return Flow( + metadata=FlowMetadata( + name=flow_in.name, + namespace=flow_in.namespace, + displayName=flow_in.display_name, + ), + spec=spec, + status=status, + ) + + def _build_trigger_config( + self, + trigger_type: FlowTriggerType, + trigger_config: Dict[str, Any], + ) -> FlowTriggerConfig: + """Build FlowTriggerConfig from trigger type and config dict.""" + from app.schemas.flow import ( + CronTriggerConfig, + EventTriggerConfig, + FlowEventType, + GitPushEventConfig, + IntervalTriggerConfig, + OneTimeTriggerConfig, + ) + + trigger_type_enum = ( + trigger_type + if isinstance(trigger_type, FlowTriggerType) + else FlowTriggerType(trigger_type) + ) + + if trigger_type_enum == FlowTriggerType.CRON: + return FlowTriggerConfig( + type=trigger_type_enum, + cron=CronTriggerConfig( + expression=trigger_config.get("expression", "0 9 * * *"), + timezone=trigger_config.get("timezone", "UTC"), + ), + ) + elif trigger_type_enum == FlowTriggerType.INTERVAL: + return FlowTriggerConfig( + type=trigger_type_enum, + interval=IntervalTriggerConfig( + value=trigger_config.get("value", 1), + unit=trigger_config.get("unit", "hours"), + ), + ) + elif trigger_type_enum == FlowTriggerType.ONE_TIME: + return FlowTriggerConfig( + type=trigger_type_enum, + one_time=OneTimeTriggerConfig( + execute_at=datetime.fromisoformat(trigger_config.get("execute_at")), + ), + ) + elif trigger_type_enum == FlowTriggerType.EVENT: + event_type = trigger_config.get("event_type", "webhook") + git_push_config = None + + if event_type == "git_push": + git_push_data = trigger_config.get("git_push", {}) + git_push_config = GitPushEventConfig( + repository=git_push_data.get("repository", ""), + branch=git_push_data.get("branch"), + ) + + return FlowTriggerConfig( + type=trigger_type_enum, + event=EventTriggerConfig( + event_type=FlowEventType(event_type), + git_push=git_push_config, + ), + ) + + raise ValueError(f"Unknown trigger type: {trigger_type}") + + def _extract_trigger_config(self, trigger: FlowTriggerConfig) -> Dict[str, Any]: + """Extract trigger config dict from FlowTriggerConfig.""" + if trigger.type == FlowTriggerType.CRON and trigger.cron: + return { + "expression": trigger.cron.expression, + "timezone": trigger.cron.timezone, + } + elif trigger.type == FlowTriggerType.INTERVAL and trigger.interval: + return { + "value": trigger.interval.value, + "unit": trigger.interval.unit, + } + elif trigger.type == FlowTriggerType.ONE_TIME and trigger.one_time: + return { + "execute_at": trigger.one_time.execute_at.isoformat(), + } + elif trigger.type == FlowTriggerType.EVENT and trigger.event: + result = {"event_type": trigger.event.event_type.value} + if trigger.event.git_push: + result["git_push"] = { + "repository": trigger.event.git_push.repository, + "branch": trigger.event.git_push.branch, + } + return result + + return {} + + def _calculate_next_execution_time( + self, + trigger_type: FlowTriggerType, + trigger_config: Dict[str, Any], + ) -> Optional[datetime]: + """Calculate the next execution time based on trigger configuration.""" + trigger_type_enum = ( + trigger_type + if isinstance(trigger_type, FlowTriggerType) + else FlowTriggerType(trigger_type) + ) + + now = datetime.utcnow() + + if trigger_type_enum == FlowTriggerType.CRON: + # Use croniter to calculate next run + try: + from croniter import croniter + + cron_expr = trigger_config.get("expression", "0 9 * * *") + iter = croniter(cron_expr, now) + return iter.get_next(datetime) + except Exception as e: + logger.warning(f"Failed to parse cron expression: {e}") + return None + + elif trigger_type_enum == FlowTriggerType.INTERVAL: + value = trigger_config.get("value", 1) + unit = trigger_config.get("unit", "hours") + + if unit == "minutes": + return now + timedelta(minutes=value) + elif unit == "hours": + return now + timedelta(hours=value) + elif unit == "days": + return now + timedelta(days=value) + + elif trigger_type_enum == FlowTriggerType.ONE_TIME: + execute_at = trigger_config.get("execute_at") + if execute_at: + if isinstance(execute_at, str): + return datetime.fromisoformat(execute_at.replace("Z", "+00:00")) + return execute_at + + # Event triggers don't have scheduled next execution + return None + + def _convert_to_flow_in_db(self, flow: FlowResource) -> FlowInDB: + """Convert FlowResource to FlowInDB.""" + flow_crd = Flow.model_validate(flow.json) + + # Build webhook URL + webhook_url = None + if flow.webhook_token: + webhook_url = f"/api/flows/webhook/{flow.webhook_token}" + + return FlowInDB( + id=flow.id, + user_id=flow.user_id, + name=flow.name, + namespace=flow.namespace, + display_name=flow_crd.spec.displayName, + description=flow_crd.spec.description, + task_type=flow_crd.spec.taskType, + trigger_type=FlowTriggerType(flow.trigger_type), + trigger_config=self._extract_trigger_config(flow_crd.spec.trigger), + team_id=flow.team_id, + workspace_id=flow.workspace_id, + prompt_template=flow_crd.spec.promptTemplate, + retry_count=flow_crd.spec.retryCount, + enabled=flow.enabled, + webhook_url=webhook_url, + last_execution_time=flow.last_execution_time, + last_execution_status=flow.last_execution_status, + next_execution_time=flow.next_execution_time, + execution_count=flow.execution_count, + success_count=flow.success_count, + failure_count=flow.failure_count, + created_at=flow.created_at, + updated_at=flow.updated_at, + ) + + def _convert_execution_to_dict(self, execution: FlowExecution) -> Dict[str, Any]: + """Convert FlowExecution to dict.""" + return { + "id": execution.id, + "user_id": execution.user_id, + "flow_id": execution.flow_id, + "task_id": execution.task_id, + "trigger_type": execution.trigger_type, + "trigger_reason": execution.trigger_reason, + "prompt": execution.prompt, + "status": FlowExecutionStatus(execution.status), + "result_summary": execution.result_summary, + "error_message": execution.error_message, + "retry_attempt": execution.retry_attempt, + "started_at": execution.started_at, + "completed_at": execution.completed_at, + "created_at": execution.created_at, + "updated_at": execution.updated_at, + } + + +flow_service = FlowService() diff --git a/backend/app/services/flow_scheduler.py b/backend/app/services/flow_scheduler.py new file mode 100644 index 000000000..889ffa58e --- /dev/null +++ b/backend/app/services/flow_scheduler.py @@ -0,0 +1,389 @@ +# SPDX-FileCopyrightText: 2025 Weibo, Inc. +# +# SPDX-License-Identifier: Apache-2.0 + +""" +Flow scheduler for triggering scheduled Flow executions. + +This module integrates with the existing background job system to periodically +check for flows that need to be executed and trigger them. +""" +import asyncio +import logging +import threading +from datetime import datetime, timedelta +from typing import List, Optional + +from app.core.cache import cache_manager +from app.core.config import settings +from app.db.session import SessionLocal +from app.models.flow import FlowResource +from app.schemas.flow import Flow, FlowExecutionStatus, FlowTriggerType +from app.services.flow import flow_service + +logger = logging.getLogger(__name__) + +# Configuration +FLOW_SCHEDULER_INTERVAL_SECONDS = getattr(settings, "FLOW_SCHEDULER_INTERVAL_SECONDS", 60) +FLOW_SCHEDULER_LOCK_KEY = "flow_scheduler_lock" +FLOW_SCHEDULER_LOCK_EXPIRY = max(FLOW_SCHEDULER_INTERVAL_SECONDS - 10, 10) + + +async def acquire_flow_scheduler_lock() -> bool: + """ + Try to acquire distributed lock to ensure only one instance executes the scheduler. + """ + try: + acquired = await cache_manager.setnx( + FLOW_SCHEDULER_LOCK_KEY, True, expire=FLOW_SCHEDULER_LOCK_EXPIRY + ) + if acquired: + logger.debug( + f"[flow_scheduler] Successfully acquired distributed lock: {FLOW_SCHEDULER_LOCK_KEY}" + ) + else: + logger.debug( + f"[flow_scheduler] Lock is held by another instance: {FLOW_SCHEDULER_LOCK_KEY}" + ) + return acquired + except Exception as e: + logger.error(f"[flow_scheduler] Error acquiring distributed lock: {str(e)}") + return False + + +async def release_flow_scheduler_lock() -> bool: + """Release distributed lock.""" + try: + return await cache_manager.delete(FLOW_SCHEDULER_LOCK_KEY) + except Exception as e: + logger.error(f"[flow_scheduler] Error releasing lock: {str(e)}") + return False + + +def get_due_flows(db, now: datetime) -> List[FlowResource]: + """ + Get all enabled flows that are due for execution. + + Returns flows where: + - is_active = True + - enabled = True + - next_execution_time <= now + - trigger_type is cron, interval, or one_time (not event) + """ + return ( + db.query(FlowResource) + .filter( + FlowResource.is_active == True, + FlowResource.enabled == True, + FlowResource.next_execution_time != None, + FlowResource.next_execution_time <= now, + FlowResource.trigger_type.in_([ + FlowTriggerType.CRON.value, + FlowTriggerType.INTERVAL.value, + FlowTriggerType.ONE_TIME.value, + ]), + ) + .all() + ) + + +def execute_flow(db, flow: FlowResource) -> None: + """ + Execute a single flow by creating an execution record. + + The actual task execution is handled asynchronously by the task system. + """ + try: + flow_crd = Flow.model_validate(flow.json) + trigger_type = flow.trigger_type + + # Determine trigger reason based on trigger type + if trigger_type == FlowTriggerType.CRON.value: + trigger_reason = f"Scheduled (cron: {flow_crd.spec.trigger.cron.expression})" + elif trigger_type == FlowTriggerType.INTERVAL.value: + interval = flow_crd.spec.trigger.interval + trigger_reason = f"Scheduled (interval: {interval.value} {interval.unit})" + elif trigger_type == FlowTriggerType.ONE_TIME.value: + trigger_reason = "One-time scheduled execution" + else: + trigger_reason = "Scheduled execution" + + # Create execution record + execution = flow_service._create_execution( + db, + flow=flow, + user_id=flow.user_id, + trigger_type=trigger_type, + trigger_reason=trigger_reason, + ) + + logger.info( + f"[flow_scheduler] Created execution {execution.id} for flow {flow.id} ({flow.name})" + ) + + # Update flow's next execution time + flow_crd = Flow.model_validate(flow.json) + trigger_config = flow_service._extract_trigger_config(flow_crd.spec.trigger) + + if trigger_type == FlowTriggerType.ONE_TIME.value: + # One-time flows should be disabled after execution + flow.enabled = False + flow.next_execution_time = None + flow_crd.spec.enabled = False + flow.json = flow_crd.model_dump(mode="json") + logger.info( + f"[flow_scheduler] One-time flow {flow.id} disabled after execution" + ) + else: + # Calculate next execution time for recurring flows + flow.next_execution_time = flow_service._calculate_next_execution_time( + trigger_type, trigger_config + ) + logger.info( + f"[flow_scheduler] Next execution for flow {flow.id}: {flow.next_execution_time}" + ) + + db.commit() + + # TODO: Trigger actual task execution via the task system + # This would involve calling the task creation API or directly + # creating a task associated with this execution + _trigger_task_execution(db, flow, execution) + + except Exception as e: + logger.error( + f"[flow_scheduler] Error executing flow {flow.id} ({flow.name}): {str(e)}" + ) + db.rollback() + + +def _trigger_task_execution(db, flow: FlowResource, execution) -> None: + """ + Trigger the actual task execution for a flow. + + This creates a Task resource and links it to the flow execution. + The task will be executed by the existing task execution system. + """ + try: + from app.models.task import TaskResource + from app.models.kind import Kind + + flow_crd = Flow.model_validate(flow.json) + + # Get team + team = ( + db.query(Kind) + .filter(Kind.id == flow.team_id, Kind.kind == "Team", Kind.is_active == True) + .first() + ) + + if not team: + logger.error(f"[flow_scheduler] Team {flow.team_id} not found for flow {flow.id}") + flow_service.update_execution_status( + db, + execution_id=execution.id, + status=FlowExecutionStatus.FAILED, + error_message=f"Team {flow.team_id} not found", + ) + return + + # Get workspace if specified + workspace = None + git_url = "" + git_repo = "" + git_domain = "" + branch_name = "" + + if flow.workspace_id: + workspace = ( + db.query(TaskResource) + .filter( + TaskResource.id == flow.workspace_id, + TaskResource.kind == "Workspace", + TaskResource.is_active == True, + ) + .first() + ) + if workspace: + ws_json = workspace.json + git_url = ws_json.get("spec", {}).get("repository", {}).get("url", "") + git_repo = ws_json.get("spec", {}).get("repository", {}).get("name", "") + git_domain = ws_json.get("spec", {}).get("repository", {}).get("domain", "") + branch_name = ws_json.get("spec", {}).get("repository", {}).get("branch", "") + + # Create task resource + import uuid + from sqlalchemy.orm.attributes import flag_modified + + task_name = f"flow-{flow.id}-exec-{execution.id}-{uuid.uuid4().hex[:8]}" + + task_json = { + "apiVersion": "agent.wecode.io/v1", + "kind": "Task", + "metadata": { + "name": task_name, + "namespace": flow.namespace, + }, + "spec": { + "teamRef": { + "name": team.name, + "namespace": team.namespace, + }, + "workspaceRef": ( + { + "name": workspace.name if workspace else "", + "namespace": workspace.namespace if workspace else "", + } + if workspace + else None + ), + "prompt": execution.prompt, + }, + "status": { + "phase": "PENDING", + "progress": 0, + }, + } + + task = TaskResource( + user_id=flow.user_id, + kind="Task", + name=task_name, + namespace=flow.namespace, + json=task_json, + is_active=True, + ) + + db.add(task) + db.commit() + db.refresh(task) + + # Link task to execution + execution.task_id = task.id + execution.status = FlowExecutionStatus.PENDING.value + db.commit() + + logger.info( + f"[flow_scheduler] Created task {task.id} for flow execution {execution.id}" + ) + + # Update execution status to running + flow_service.update_execution_status( + db, + execution_id=execution.id, + status=FlowExecutionStatus.RUNNING, + ) + + except Exception as e: + logger.error( + f"[flow_scheduler] Error triggering task for flow {flow.id}: {str(e)}" + ) + flow_service.update_execution_status( + db, + execution_id=execution.id, + status=FlowExecutionStatus.FAILED, + error_message=str(e), + ) + + +def flow_scheduler_worker(stop_event: threading.Event) -> None: + """ + Background worker for the flow scheduler. + + Periodically checks for flows that need execution and triggers them. + """ + logger.info("[flow_scheduler] Flow scheduler worker started") + + while not stop_event.is_set(): + try: + # Create async runtime for distributed locking + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + # Try to acquire distributed lock + lock_acquired = loop.run_until_complete(acquire_flow_scheduler_lock()) + + if not lock_acquired: + logger.debug( + "[flow_scheduler] Another instance is running the scheduler, skipping" + ) + else: + try: + logger.debug("[flow_scheduler] Starting scheduler cycle") + + db = SessionLocal() + try: + now = datetime.utcnow() + due_flows = get_due_flows(db, now) + + if due_flows: + logger.info( + f"[flow_scheduler] Found {len(due_flows)} flow(s) due for execution" + ) + + for flow in due_flows: + execute_flow(db, flow) + else: + logger.debug("[flow_scheduler] No flows due for execution") + + finally: + db.close() + + logger.debug("[flow_scheduler] Scheduler cycle completed") + + except Exception as e: + logger.error(f"[flow_scheduler] Error in scheduler cycle: {str(e)}") + + finally: + # Release lock + try: + loop.run_until_complete(release_flow_scheduler_lock()) + except Exception as e: + logger.error(f"[flow_scheduler] Error releasing lock: {str(e)}") + + loop.close() + + except Exception as e: + logger.error(f"[flow_scheduler] Worker error: {str(e)}") + + # Wait for next cycle + stop_event.wait(timeout=FLOW_SCHEDULER_INTERVAL_SECONDS) + + logger.info("[flow_scheduler] Flow scheduler worker stopped") + + +def start_flow_scheduler(app) -> None: + """ + Start the flow scheduler background worker. + + Args: + app: FastAPI application instance + """ + app.state.flow_scheduler_stop_event = threading.Event() + app.state.flow_scheduler_thread = threading.Thread( + target=flow_scheduler_worker, + args=(app.state.flow_scheduler_stop_event,), + name="flow-scheduler-worker", + daemon=True, + ) + app.state.flow_scheduler_thread.start() + logger.info("[flow_scheduler] Flow scheduler worker started") + + +def stop_flow_scheduler(app) -> None: + """ + Stop the flow scheduler background worker. + + Args: + app: FastAPI application instance + """ + stop_event = getattr(app.state, "flow_scheduler_stop_event", None) + thread = getattr(app.state, "flow_scheduler_thread", None) + + if stop_event: + stop_event.set() + + if thread: + thread.join(timeout=5.0) + + logger.info("[flow_scheduler] Flow scheduler worker stopped") diff --git a/backend/app/services/jobs.py b/backend/app/services/jobs.py index 358e89fb0..7e2765766 100644 --- a/backend/app/services/jobs.py +++ b/backend/app/services/jobs.py @@ -17,6 +17,7 @@ from app.core.config import settings from app.db.session import SessionLocal from app.services.adapters.executor_job import job_service +from app.services.flow_scheduler import start_flow_scheduler, stop_flow_scheduler from app.services.repository_job import repository_job_service logger = logging.getLogger(__name__) @@ -181,6 +182,9 @@ def start_background_jobs(app): app.state.repo_update_thread.start() logger.info("[job] repository update worker started") + # Start flow scheduler thread + start_flow_scheduler(app) + def stop_background_jobs(app): """ @@ -206,3 +210,6 @@ def stop_background_jobs(app): if repo_thread: repo_thread.join(timeout=5.0) logger.info("[job] repository update worker stopped") + + # Stop flow scheduler thread gracefully + stop_flow_scheduler(app) diff --git a/backend/tests/api/endpoints/test_flows.py b/backend/tests/api/endpoints/test_flows.py new file mode 100644 index 000000000..69f186ab8 --- /dev/null +++ b/backend/tests/api/endpoints/test_flows.py @@ -0,0 +1,287 @@ +# SPDX-FileCopyrightText: 2025 Weibo, Inc. +# +# SPDX-License-Identifier: Apache-2.0 + +""" +Tests for Flow API endpoints. +""" +import json +from datetime import datetime, timedelta +from unittest.mock import patch + +import pytest +from fastapi import status + +from app.models.flow import FlowExecution, FlowResource +from app.models.kind import Kind +from app.schemas.flow import FlowExecutionStatus, FlowTaskType, FlowTriggerType + + +class TestFlowEndpoints: + """Test Flow API endpoints.""" + + @pytest.fixture + def test_team(self, db_session, test_user): + """Create a test team.""" + team = Kind( + user_id=test_user.id, + kind="Team", + name="test-team", + namespace="default", + json={ + "apiVersion": "agent.wecode.io/v1", + "kind": "Team", + "metadata": {"name": "test-team", "namespace": "default"}, + "spec": {"displayName": "Test Team", "members": []}, + }, + is_active=True, + ) + db_session.add(team) + db_session.commit() + db_session.refresh(team) + return team + + @pytest.fixture + def test_flow(self, db_session, test_user, test_team): + """Create a test flow.""" + flow_json = { + "apiVersion": "agent.wecode.io/v1", + "kind": "Flow", + "metadata": {"name": "test-flow", "namespace": "default"}, + "spec": { + "displayName": "Test Flow", + "taskType": "collection", + "trigger": { + "type": "cron", + "cron": {"expression": "0 9 * * *", "timezone": "UTC"}, + }, + "teamRef": {"name": "test-team", "namespace": "default"}, + "promptTemplate": "Test prompt", + "retryCount": 0, + "enabled": True, + }, + "status": {"state": "Available"}, + } + flow = FlowResource( + user_id=test_user.id, + kind="Flow", + name="test-flow", + namespace="default", + json=flow_json, + is_active=True, + enabled=True, + trigger_type="cron", + team_id=test_team.id, + ) + db_session.add(flow) + db_session.commit() + db_session.refresh(flow) + return flow + + def test_create_flow(self, client, test_user, test_team, auth_headers): + """Test creating a new flow.""" + response = client.post( + "/api/flows", + json={ + "name": "new-flow", + "display_name": "New Flow", + "task_type": "collection", + "trigger_type": "cron", + "trigger_config": {"expression": "0 9 * * *", "timezone": "UTC"}, + "team_id": test_team.id, + "prompt_template": "Test prompt {{date}}", + "retry_count": 1, + "enabled": True, + }, + headers=auth_headers, + ) + assert response.status_code == status.HTTP_201_CREATED + data = response.json() + assert data["name"] == "new-flow" + assert data["display_name"] == "New Flow" + assert data["enabled"] == True + + def test_list_flows(self, client, test_flow, auth_headers): + """Test listing flows.""" + response = client.get("/api/flows", headers=auth_headers) + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["total"] >= 1 + assert len(data["items"]) >= 1 + + def test_get_flow(self, client, test_flow, auth_headers): + """Test getting a specific flow.""" + response = client.get(f"/api/flows/{test_flow.id}", headers=auth_headers) + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["id"] == test_flow.id + assert data["name"] == "test-flow" + + def test_update_flow(self, client, test_flow, auth_headers): + """Test updating a flow.""" + response = client.put( + f"/api/flows/{test_flow.id}", + json={"display_name": "Updated Flow Name", "enabled": False}, + headers=auth_headers, + ) + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["display_name"] == "Updated Flow Name" + assert data["enabled"] == False + + def test_delete_flow(self, client, test_flow, auth_headers): + """Test deleting a flow.""" + response = client.delete(f"/api/flows/{test_flow.id}", headers=auth_headers) + assert response.status_code == status.HTTP_204_NO_CONTENT + + # Verify soft delete + response = client.get(f"/api/flows/{test_flow.id}", headers=auth_headers) + assert response.status_code == status.HTTP_404_NOT_FOUND + + def test_toggle_flow(self, client, test_flow, auth_headers): + """Test toggling flow enabled/disabled.""" + # Disable + response = client.post( + f"/api/flows/{test_flow.id}/toggle?enabled=false", headers=auth_headers + ) + assert response.status_code == status.HTTP_200_OK + assert response.json()["enabled"] == False + + # Enable + response = client.post( + f"/api/flows/{test_flow.id}/toggle?enabled=true", headers=auth_headers + ) + assert response.status_code == status.HTTP_200_OK + assert response.json()["enabled"] == True + + def test_trigger_flow(self, client, test_flow, auth_headers): + """Test manually triggering a flow.""" + response = client.post( + f"/api/flows/{test_flow.id}/trigger", headers=auth_headers + ) + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["flow_id"] == test_flow.id + assert data["trigger_type"] == "manual" + assert data["status"] == "PENDING" + + def test_create_flow_without_team(self, client, auth_headers): + """Test creating flow without valid team fails.""" + response = client.post( + "/api/flows", + json={ + "name": "invalid-flow", + "display_name": "Invalid Flow", + "task_type": "collection", + "trigger_type": "cron", + "trigger_config": {"expression": "0 9 * * *"}, + "team_id": 99999, # Non-existent team + "prompt_template": "Test", + }, + headers=auth_headers, + ) + assert response.status_code == status.HTTP_400_BAD_REQUEST + + +class TestFlowExecutionEndpoints: + """Test Flow Execution API endpoints.""" + + @pytest.fixture + def test_team(self, db_session, test_user): + """Create a test team.""" + team = Kind( + user_id=test_user.id, + kind="Team", + name="test-team", + namespace="default", + json={"apiVersion": "agent.wecode.io/v1", "kind": "Team"}, + is_active=True, + ) + db_session.add(team) + db_session.commit() + db_session.refresh(team) + return team + + @pytest.fixture + def test_flow(self, db_session, test_user, test_team): + """Create a test flow.""" + flow = FlowResource( + user_id=test_user.id, + kind="Flow", + name="test-flow", + namespace="default", + json={ + "apiVersion": "agent.wecode.io/v1", + "kind": "Flow", + "spec": { + "displayName": "Test Flow", + "taskType": "collection", + "trigger": {"type": "cron", "cron": {"expression": "0 9 * * *"}}, + "teamRef": {"name": "test-team"}, + "promptTemplate": "Test", + }, + }, + is_active=True, + enabled=True, + trigger_type="cron", + team_id=test_team.id, + ) + db_session.add(flow) + db_session.commit() + db_session.refresh(flow) + return flow + + @pytest.fixture + def test_execution(self, db_session, test_user, test_flow): + """Create a test execution.""" + execution = FlowExecution( + user_id=test_user.id, + flow_id=test_flow.id, + trigger_type="cron", + trigger_reason="Scheduled execution", + prompt="Test prompt", + status="COMPLETED", + result_summary="Test completed successfully", + started_at=datetime.utcnow() - timedelta(minutes=5), + completed_at=datetime.utcnow(), + ) + db_session.add(execution) + db_session.commit() + db_session.refresh(execution) + return execution + + def test_list_executions(self, client, test_execution, auth_headers): + """Test listing executions.""" + response = client.get("/api/flows/executions", headers=auth_headers) + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["total"] >= 1 + assert len(data["items"]) >= 1 + + def test_get_execution(self, client, test_execution, auth_headers): + """Test getting a specific execution.""" + response = client.get( + f"/api/flows/executions/{test_execution.id}", headers=auth_headers + ) + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["id"] == test_execution.id + assert data["status"] == "COMPLETED" + + def test_list_executions_with_filters( + self, client, test_flow, test_execution, auth_headers + ): + """Test listing executions with filters.""" + # Filter by flow_id + response = client.get( + f"/api/flows/executions?flow_id={test_flow.id}", headers=auth_headers + ) + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert all(item["flow_id"] == test_flow.id for item in data["items"]) + + # Filter by status + response = client.get( + "/api/flows/executions?status=COMPLETED", headers=auth_headers + ) + assert response.status_code == status.HTTP_200_OK diff --git a/frontend/src/apis/flow.ts b/frontend/src/apis/flow.ts new file mode 100644 index 000000000..30f848018 --- /dev/null +++ b/frontend/src/apis/flow.ts @@ -0,0 +1,126 @@ +// SPDX-FileCopyrightText: 2025 Weibo, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +/** + * AI Flow (智能流) API client. + */ +import { apiClient } from './client' +import type { + Flow, + FlowCreateRequest, + FlowExecution, + FlowExecutionListResponse, + FlowExecutionStatus, + FlowListResponse, + FlowTriggerType, + FlowUpdateRequest, +} from '@/types/flow' +import type { PaginationParams } from '@/types/api' + +export const flowApis = { + /** + * List user's flow configurations + */ + async getFlows( + params?: PaginationParams, + enabled?: boolean, + triggerType?: FlowTriggerType + ): Promise { + const queryParams = new URLSearchParams() + queryParams.append('page', String(params?.page || 1)) + queryParams.append('limit', String(params?.limit || 20)) + + if (enabled !== undefined) { + queryParams.append('enabled', String(enabled)) + } + + if (triggerType) { + queryParams.append('trigger_type', triggerType) + } + + return apiClient.get(`/flows?${queryParams.toString()}`) + }, + + /** + * Create a new flow + */ + async createFlow(data: FlowCreateRequest): Promise { + return apiClient.post('/flows', data) + }, + + /** + * Get a specific flow by ID + */ + async getFlow(id: number): Promise { + return apiClient.get(`/flows/${id}`) + }, + + /** + * Update a flow + */ + async updateFlow(id: number, data: FlowUpdateRequest): Promise { + return apiClient.put(`/flows/${id}`, data) + }, + + /** + * Delete a flow + */ + async deleteFlow(id: number): Promise { + await apiClient.delete(`/flows/${id}`) + }, + + /** + * Toggle flow enabled/disabled + */ + async toggleFlow(id: number, enabled: boolean): Promise { + return apiClient.post(`/flows/${id}/toggle?enabled=${enabled}`) + }, + + /** + * Manually trigger a flow + */ + async triggerFlow(id: number): Promise { + return apiClient.post(`/flows/${id}/trigger`) + }, + + /** + * List flow executions (timeline) + */ + async getExecutions( + params?: PaginationParams, + flowId?: number, + status?: FlowExecutionStatus[], + startDate?: string, + endDate?: string + ): Promise { + const queryParams = new URLSearchParams() + queryParams.append('page', String(params?.page || 1)) + queryParams.append('limit', String(params?.limit || 50)) + + if (flowId) { + queryParams.append('flow_id', String(flowId)) + } + + if (status && status.length > 0) { + status.forEach(s => queryParams.append('status', s)) + } + + if (startDate) { + queryParams.append('start_date', startDate) + } + + if (endDate) { + queryParams.append('end_date', endDate) + } + + return apiClient.get(`/flows/executions?${queryParams.toString()}`) + }, + + /** + * Get a specific execution by ID + */ + async getExecution(id: number): Promise { + return apiClient.get(`/flows/executions/${id}`) + }, +} diff --git a/frontend/src/app/flow/page.tsx b/frontend/src/app/flow/page.tsx new file mode 100644 index 000000000..05ae3c01e --- /dev/null +++ b/frontend/src/app/flow/page.tsx @@ -0,0 +1,9 @@ +// SPDX-FileCopyrightText: 2025 Weibo, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +import { FlowPage } from '@/features/flows/components' + +export default function Flow() { + return +} diff --git a/frontend/src/config/paths.ts b/frontend/src/config/paths.ts index ac9f0793c..3404206b9 100644 --- a/frontend/src/config/paths.ts +++ b/frontend/src/config/paths.ts @@ -32,6 +32,9 @@ export const paths = { wiki: { getHref: () => '/knowledge', }, + flow: { + getHref: () => '/flow', + }, settings: { root: { getHref: () => '/settings', diff --git a/frontend/src/features/flows/components/FlowForm.tsx b/frontend/src/features/flows/components/FlowForm.tsx new file mode 100644 index 000000000..98ec4a1db --- /dev/null +++ b/frontend/src/features/flows/components/FlowForm.tsx @@ -0,0 +1,513 @@ +'use client' + +// SPDX-FileCopyrightText: 2025 Weibo, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +/** + * Flow creation/edit form component. + */ +import { useCallback, useEffect, useState } from 'react' +import { useTranslation } from '@/hooks/useTranslation' +import { Button } from '@/components/ui/button' +import { Input } from '@/components/ui/input' +import { Label } from '@/components/ui/label' +import { Textarea } from '@/components/ui/textarea' +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from '@/components/ui/select' +import { + Sheet, + SheetContent, + SheetDescription, + SheetFooter, + SheetHeader, + SheetTitle, +} from '@/components/ui/sheet' +import { Switch } from '@/components/ui/switch' +import { flowApis } from '@/apis/flow' +import { teamApis } from '@/apis/team' +import type { Team } from '@/types/api' +import type { + Flow, + FlowCreateRequest, + FlowTaskType, + FlowTriggerType, + FlowUpdateRequest, +} from '@/types/flow' +import { toast } from 'sonner' + +interface FlowFormProps { + open: boolean + onOpenChange: (open: boolean) => void + flow?: Flow | null + onSuccess: () => void +} + +const defaultTriggerConfig: Record> = { + cron: { expression: '0 9 * * *', timezone: 'UTC' }, + interval: { value: 1, unit: 'hours' }, + one_time: { execute_at: new Date().toISOString() }, + event: { event_type: 'webhook' }, +} + +export function FlowForm({ open, onOpenChange, flow, onSuccess }: FlowFormProps) { + const { t } = useTranslation('flow') + const isEditing = !!flow + + // Form state + const [name, setName] = useState('') + const [displayName, setDisplayName] = useState('') + const [description, setDescription] = useState('') + const [taskType, setTaskType] = useState('collection') + const [triggerType, setTriggerType] = useState('cron') + const [triggerConfig, setTriggerConfig] = useState>( + defaultTriggerConfig.cron + ) + const [teamId, setTeamId] = useState(null) + const [promptTemplate, setPromptTemplate] = useState('') + const [retryCount, setRetryCount] = useState(0) + const [enabled, setEnabled] = useState(true) + + // Teams for selection + const [teams, setTeams] = useState([]) + const [teamsLoading, setTeamsLoading] = useState(false) + + // Submit state + const [submitting, setSubmitting] = useState(false) + + // Load teams + useEffect(() => { + const loadTeams = async () => { + setTeamsLoading(true) + try { + const response = await teamApis.getTeams({ page: 1, limit: 100 }) + setTeams(response.items) + } catch (error) { + console.error('Failed to load teams:', error) + } finally { + setTeamsLoading(false) + } + } + if (open) { + loadTeams() + } + }, [open]) + + // Reset form when flow changes + useEffect(() => { + if (flow) { + setName(flow.name) + setDisplayName(flow.display_name) + setDescription(flow.description || '') + setTaskType(flow.task_type) + setTriggerType(flow.trigger_type) + setTriggerConfig(flow.trigger_config) + setTeamId(flow.team_id) + setPromptTemplate(flow.prompt_template) + setRetryCount(flow.retry_count) + setEnabled(flow.enabled) + } else { + setName('') + setDisplayName('') + setDescription('') + setTaskType('collection') + setTriggerType('cron') + setTriggerConfig(defaultTriggerConfig.cron) + setTeamId(null) + setPromptTemplate('') + setRetryCount(0) + setEnabled(true) + } + }, [flow, open]) + + // Handle trigger type change + const handleTriggerTypeChange = useCallback((value: FlowTriggerType) => { + setTriggerType(value) + setTriggerConfig(defaultTriggerConfig[value]) + }, []) + + // Handle submit + const handleSubmit = useCallback(async () => { + // Validation + if (!displayName.trim()) { + toast.error(t('validation_display_name_required')) + return + } + if (!teamId) { + toast.error(t('validation_team_required')) + return + } + if (!promptTemplate.trim()) { + toast.error(t('validation_prompt_required')) + return + } + + setSubmitting(true) + try { + if (isEditing && flow) { + const updateData: FlowUpdateRequest = { + display_name: displayName, + description: description || undefined, + task_type: taskType, + trigger_type: triggerType, + trigger_config: triggerConfig, + team_id: teamId, + prompt_template: promptTemplate, + retry_count: retryCount, + enabled, + } + await flowApis.updateFlow(flow.id, updateData) + toast.success(t('update_success')) + } else { + // Generate name from display name + const generatedName = displayName + .toLowerCase() + .replace(/[^a-z0-9]+/g, '-') + .replace(/^-|-$/g, '') + .slice(0, 50) || `flow-${Date.now()}` + + const createData: FlowCreateRequest = { + name: generatedName, + display_name: displayName, + description: description || undefined, + task_type: taskType, + trigger_type: triggerType, + trigger_config: triggerConfig, + team_id: teamId, + prompt_template: promptTemplate, + retry_count: retryCount, + enabled, + } + await flowApis.createFlow(createData) + toast.success(t('create_success')) + } + onSuccess() + onOpenChange(false) + } catch (error: any) { + console.error('Failed to save flow:', error) + toast.error(error?.message || t('save_failed')) + } finally { + setSubmitting(false) + } + }, [ + displayName, + description, + taskType, + triggerType, + triggerConfig, + teamId, + promptTemplate, + retryCount, + enabled, + isEditing, + flow, + onSuccess, + onOpenChange, + t, + ]) + + const renderTriggerConfig = () => { + switch (triggerType) { + case 'cron': + return ( +
+
+ + + setTriggerConfig({ ...triggerConfig, expression: e.target.value }) + } + placeholder="0 9 * * *" + /> +

+ {t('cron_hint')} +

+
+
+ ) + case 'interval': + return ( +
+
+ + + setTriggerConfig({ + ...triggerConfig, + value: parseInt(e.target.value) || 1, + }) + } + /> +
+
+ + +
+
+ ) + case 'one_time': + return ( +
+ + + setTriggerConfig({ + ...triggerConfig, + execute_at: new Date(e.target.value).toISOString(), + }) + } + /> +
+ ) + case 'event': + return ( +
+ + + {triggerConfig.event_type === 'git_push' && ( +
+
+ + + setTriggerConfig({ + ...triggerConfig, + git_push: { + ...(triggerConfig.git_push as any), + repository: e.target.value, + }, + }) + } + placeholder="owner/repo" + /> +
+
+ + + setTriggerConfig({ + ...triggerConfig, + git_push: { + ...(triggerConfig.git_push as any), + branch: e.target.value, + }, + }) + } + placeholder="main" + /> +
+
+ )} +
+ ) + default: + return null + } + } + + return ( + + + + + {isEditing ? t('edit_flow') : t('create_flow')} + + + {isEditing ? t('edit_flow_desc') : t('create_flow_desc')} + + + +
+ {/* Display Name */} +
+ + setDisplayName(e.target.value)} + placeholder={t('display_name_placeholder')} + /> +
+ + {/* Description */} +
+ + setDescription(e.target.value)} + placeholder={t('description_placeholder')} + /> +
+ + {/* Task Type */} +
+ + +
+ + {/* Trigger Type */} +
+ + +
+ + {/* Trigger Config */} +
+
{t('trigger_config')}
+ {renderTriggerConfig()} +
+ + {/* Team Selection */} +
+ + +
+ + {/* Prompt Template */} +
+ +