Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove the unused retry index field #11903

Merged
merged 12 commits into from
Dec 23, 2024
4 changes: 2 additions & 2 deletions api/controllers/console/app/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from core.app.entities.app_invoke_entities import InvokeFrom
from factories import variable_factory
from fields.workflow_fields import workflow_fields
from fields.workflow_run_fields import workflow_run_node_execution_fields
from fields.workflow_run_fields import single_step_node_execution_fields
from libs import helper
from libs.helper import TimestampField, uuid_value
from libs.login import current_user, login_required
Expand Down Expand Up @@ -285,7 +285,7 @@ class DraftWorkflowNodeRunApi(Resource):
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@marshal_with(workflow_run_node_execution_fields)
@marshal_with(single_step_node_execution_fields)
def post(self, app_model: App, node_id: str):
"""
Run draft workflow node
Expand Down
31 changes: 16 additions & 15 deletions api/core/app/apps/advanced_chat/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,22 @@ def _process_stream_response(
yield self._workflow_start_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
)
elif isinstance(
event,
QueueNodeRetryEvent,
):
workflow_node_execution = self._handle_workflow_node_execution_retried(
workflow_run=workflow_run, event=event
)

response = self._workflow_node_retry_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)

if response:
yield response
elif isinstance(event, QueueNodeStartedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
Expand Down Expand Up @@ -331,22 +347,7 @@ def _process_stream_response(

if response:
yield response
elif isinstance(
event,
QueueNodeRetryEvent,
):
workflow_node_execution = self._handle_workflow_node_execution_retried(
workflow_run=workflow_run, event=event
)

response = self._workflow_node_retry_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)

if response:
yield response
elif isinstance(event, QueueParallelBranchRunStartedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
Expand Down
32 changes: 16 additions & 16 deletions api/core/app/apps/workflow/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,22 @@ def _process_stream_response(
yield self._workflow_start_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
)
elif isinstance(
event,
QueueNodeRetryEvent,
):
workflow_node_execution = self._handle_workflow_node_execution_retried(
workflow_run=workflow_run, event=event
)

response = self._workflow_node_retry_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)

if response:
yield response
elif isinstance(event, QueueNodeStartedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
Expand Down Expand Up @@ -289,22 +305,6 @@ def _process_stream_response(
)
if node_failed_response:
yield node_failed_response
elif isinstance(
event,
QueueNodeRetryEvent,
):
workflow_node_execution = self._handle_workflow_node_execution_retried(
workflow_run=workflow_run, event=event
)

response = self._workflow_node_retry_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)

if response:
yield response

elif isinstance(event, QueueParallelBranchRunStartedEvent):
if not workflow_run:
Expand Down
62 changes: 32 additions & 30 deletions api/core/app/apps/workflow_app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,38 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent)
)
elif isinstance(event, GraphRunFailedEvent):
self._publish_event(QueueWorkflowFailedEvent(error=event.error, exceptions_count=event.exceptions_count))
elif isinstance(event, NodeRunRetryEvent):
self._publish_event(
QueueNodeRetryEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_type=event.node_type,
node_data=event.node_data,
parallel_id=event.parallel_id,
parallel_start_node_id=event.parallel_start_node_id,
parent_parallel_id=event.parent_parallel_id,
parent_parallel_start_node_id=event.parent_parallel_start_node_id,
start_at=event.start_at,
node_run_index=event.node_run_index,
predecessor_node_id=event.predecessor_node_id,
in_iteration_id=event.in_iteration_id,
parallel_mode_run_id=event.parallel_mode_run_id,
inputs=event.route_node_state.node_run_result.inputs
if event.route_node_state.node_run_result
else {},
process_data=event.route_node_state.node_run_result.process_data
if event.route_node_state.node_run_result
else {},
outputs=event.route_node_state.node_run_result.outputs
if event.route_node_state.node_run_result
else {},
error=event.error,
execution_metadata=event.route_node_state.node_run_result.metadata
if event.route_node_state.node_run_result
else {},
retry_index=event.retry_index,
)
)
elif isinstance(event, NodeRunStartedEvent):
self._publish_event(
QueueNodeStartedEvent(
Expand Down Expand Up @@ -422,36 +454,6 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent)
error=event.error if isinstance(event, IterationRunFailedEvent) else None,
)
)
elif isinstance(event, NodeRunRetryEvent):
self._publish_event(
QueueNodeRetryEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_type=event.node_type,
node_data=event.node_data,
parallel_id=event.parallel_id,
parallel_start_node_id=event.parallel_start_node_id,
parent_parallel_id=event.parent_parallel_id,
parent_parallel_start_node_id=event.parent_parallel_start_node_id,
start_at=event.start_at,
inputs=event.route_node_state.node_run_result.inputs
if event.route_node_state.node_run_result
else {},
process_data=event.route_node_state.node_run_result.process_data
if event.route_node_state.node_run_result
else {},
outputs=event.route_node_state.node_run_result.outputs
if event.route_node_state.node_run_result
else {},
error=event.error,
execution_metadata=event.route_node_state.node_run_result.metadata
if event.route_node_state.node_run_result
else {},
in_iteration_id=event.in_iteration_id,
retry_index=event.retry_index,
start_index=event.start_index,
)
)

def get_workflow(self, app_model: App, workflow_id: str) -> Optional[Workflow]:
"""
Expand Down
19 changes: 1 addition & 18 deletions api/core/app/entities/queue_entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,35 +314,18 @@ class QueueNodeSucceededEvent(AppQueueEvent):
iteration_duration_map: Optional[dict[str, float]] = None


class QueueNodeRetryEvent(AppQueueEvent):
class QueueNodeRetryEvent(QueueNodeStartedEvent):
"""QueueNodeRetryEvent entity"""

event: QueueEvent = QueueEvent.RETRY

node_execution_id: str
node_id: str
node_type: NodeType
node_data: BaseNodeData
parallel_id: Optional[str] = None
"""parallel id if node is in parallel"""
parallel_start_node_id: Optional[str] = None
"""parallel start node id if node is in parallel"""
parent_parallel_id: Optional[str] = None
"""parent parallel id if node is in parallel"""
parent_parallel_start_node_id: Optional[str] = None
"""parent parallel start node id if node is in parallel"""
in_iteration_id: Optional[str] = None
"""iteration id if node is in iteration"""
start_at: datetime

inputs: Optional[dict[str, Any]] = None
process_data: Optional[dict[str, Any]] = None
outputs: Optional[dict[str, Any]] = None
execution_metadata: Optional[dict[NodeRunMetadataKey, Any]] = None

error: str
retry_index: int # retry index
start_index: int # start index


class QueueNodeInIterationFailedEvent(AppQueueEvent):
Expand Down
5 changes: 4 additions & 1 deletion api/core/app/task_pipeline/workflow_cycle_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ def _handle_workflow_node_execution_retried(
workflow_node_execution.workflow_id = workflow_run.workflow_id
workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value
workflow_node_execution.workflow_run_id = workflow_run.id
workflow_node_execution.predecessor_node_id = event.predecessor_node_id
workflow_node_execution.node_execution_id = event.node_execution_id
workflow_node_execution.node_id = event.node_id
workflow_node_execution.node_type = event.node_type.value
Expand All @@ -461,9 +462,11 @@ def _handle_workflow_node_execution_retried(
workflow_node_execution.execution_metadata = json.dumps(
{
NodeRunMetadataKey.ITERATION_ID: event.in_iteration_id,
NodeRunMetadataKey.PARALLEL_MODE_RUN_ID: event.parallel_mode_run_id,
NodeRunMetadataKey.ITERATION_ID: event.in_iteration_id,
}
)
workflow_node_execution.index = event.start_index
workflow_node_execution.index = event.node_run_index

db.session.add(workflow_node_execution)
db.session.commit()
Expand Down
6 changes: 4 additions & 2 deletions api/core/helper/ssrf_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,15 @@ def make_request(method, url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs):
logging.warning(f"Received status code {response.status_code} for URL {url} which is in the force list")

except httpx.RequestError as e:
if max_retries == 0:
raise
logging.warning(f"Request to URL {url} failed on attempt {retries + 1}: {e}")

retries += 1
if retries <= max_retries:
time.sleep(BACKOFF_FACTOR * (2 ** (retries - 1)))

raise MaxRetriesExceededError(f"Reached maximum retries ({max_retries}) for URL {url}")
if max_retries != 0:
raise MaxRetriesExceededError(f"Reached maximum retries ({max_retries}) for URL {url}")


def get(url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs):
Expand Down
4 changes: 2 additions & 2 deletions api/core/workflow/graph_engine/entities/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ class NodeInIterationFailedEvent(BaseNodeEvent):
error: str = Field(..., description="error")


class NodeRunRetryEvent(BaseNodeEvent):
class NodeRunRetryEvent(NodeRunStartedEvent):
error: str = Field(..., description="error")
retry_index: int = Field(..., description="which retry attempt is about to be performed")
start_at: datetime = Field(..., description="retry start time")
start_index: int = Field(..., description="retry start index")
node_run_index: int = Field(..., description="retry run index")


###########################################
Expand Down
7 changes: 4 additions & 3 deletions api/core/workflow/graph_engine/graph_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,14 +649,15 @@ def _run_node(
node_type=node_instance.node_type,
node_data=node_instance.node_data,
route_node_state=route_node_state,
error=run_result.error,
retry_index=retries,
predecessor_node_id=node_instance.previous_node_id,
parallel_id=parallel_id,
parallel_start_node_id=parallel_start_node_id,
parent_parallel_id=parent_parallel_id,
parent_parallel_start_node_id=parent_parallel_start_node_id,
error=run_result.error,
retry_index=retries,
start_at=retry_start_at,
start_index=self.graph_runtime_state.node_run_steps,
node_run_index=self.graph_runtime_state.node_run_steps,
)
time.sleep(retry_interval)
continue
Expand Down
2 changes: 1 addition & 1 deletion api/core/workflow/nodes/event/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class SingleStepRetryEvent(BaseModel):

inputs: dict | None = Field(..., description="input")
error: str = Field(..., description="error")
outputs: dict = Field(..., description="output")
outputs: dict | None = Field(..., description="output")
retry_index: int = Field(..., description="Retry attempt number")
error: str = Field(..., description="error")
elapsed_time: float = Field(..., description="elapsed time")
Expand Down
2 changes: 2 additions & 0 deletions api/core/workflow/nodes/http_request/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ def _do_http_request(self, headers: dict[str, Any]) -> httpx.Response:
response = getattr(ssrf_proxy, self.method)(**request_args)
except ssrf_proxy.MaxRetriesExceededError as e:
raise HttpRequestNodeError(str(e))
except httpx.RequestError as e:
raise HttpRequestNodeError(str(e))
return response

def invoke(self) -> Response:
Expand Down
4 changes: 4 additions & 0 deletions api/fields/workflow_run_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@
"created_by_account": fields.Nested(simple_account_fields, attribute="created_by_account", allow_null=True),
"created_by_end_user": fields.Nested(simple_end_user_fields, attribute="created_by_end_user", allow_null=True),
"finished_at": TimestampField,
}

single_step_node_execution_fields = {
**workflow_run_node_execution_fields,
"retry_events": fields.List(fields.Nested(retry_event_field)),
}

Expand Down

This file was deleted.

1 change: 0 additions & 1 deletion api/models/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,6 @@ class WorkflowNodeExecution(db.Model):
created_by_role = db.Column(db.String(255), nullable=False)
created_by = db.Column(StringUUID, nullable=False)
finished_at = db.Column(db.DateTime)
retry_index = db.Column(db.Integer, server_default=db.text("0"))

@property
def created_by_account(self):
Expand Down
Loading