Skip to content

Commit c1aa55f

Browse files
Nov1c444Novice Leelaipz8200
authored
fix: remove the unused retry index field (#11903)
Signed-off-by: -LAN- <[email protected]> Co-authored-by: Novice Lee <[email protected]> Co-authored-by: -LAN- <[email protected]>
1 parent 4b1e13e commit c1aa55f

16 files changed

+257
-269
lines changed

api/core/app/apps/advanced_chat/generate_task_pipeline.py

+32-29
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ def _to_blocking_response(self, generator: Generator[StreamResponse, None, None]
180180
else:
181181
continue
182182

183-
raise Exception("Queue listening stopped unexpectedly.")
183+
raise ValueError("queue listening stopped unexpectedly.")
184184

185185
def _to_stream_response(
186186
self, generator: Generator[StreamResponse, None, None]
@@ -291,9 +291,27 @@ def _process_stream_response(
291291
yield self._workflow_start_to_stream_response(
292292
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
293293
)
294+
elif isinstance(
295+
event,
296+
QueueNodeRetryEvent,
297+
):
298+
if not workflow_run:
299+
raise ValueError("workflow run not initialized.")
300+
workflow_node_execution = self._handle_workflow_node_execution_retried(
301+
workflow_run=workflow_run, event=event
302+
)
303+
304+
response = self._workflow_node_retry_to_stream_response(
305+
event=event,
306+
task_id=self._application_generate_entity.task_id,
307+
workflow_node_execution=workflow_node_execution,
308+
)
309+
310+
if response:
311+
yield response
294312
elif isinstance(event, QueueNodeStartedEvent):
295313
if not workflow_run:
296-
raise Exception("Workflow run not initialized.")
314+
raise ValueError("workflow run not initialized.")
297315

298316
workflow_node_execution = self._handle_node_execution_start(workflow_run=workflow_run, event=event)
299317

@@ -331,63 +349,48 @@ def _process_stream_response(
331349

332350
if response:
333351
yield response
334-
elif isinstance(
335-
event,
336-
QueueNodeRetryEvent,
337-
):
338-
workflow_node_execution = self._handle_workflow_node_execution_retried(
339-
workflow_run=workflow_run, event=event
340-
)
341352

342-
response = self._workflow_node_retry_to_stream_response(
343-
event=event,
344-
task_id=self._application_generate_entity.task_id,
345-
workflow_node_execution=workflow_node_execution,
346-
)
347-
348-
if response:
349-
yield response
350353
elif isinstance(event, QueueParallelBranchRunStartedEvent):
351354
if not workflow_run:
352-
raise Exception("Workflow run not initialized.")
355+
raise ValueError("workflow run not initialized.")
353356

354357
yield self._workflow_parallel_branch_start_to_stream_response(
355358
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
356359
)
357360
elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent):
358361
if not workflow_run:
359-
raise Exception("Workflow run not initialized.")
362+
raise ValueError("workflow run not initialized.")
360363

361364
yield self._workflow_parallel_branch_finished_to_stream_response(
362365
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
363366
)
364367
elif isinstance(event, QueueIterationStartEvent):
365368
if not workflow_run:
366-
raise Exception("Workflow run not initialized.")
369+
raise ValueError("workflow run not initialized.")
367370

368371
yield self._workflow_iteration_start_to_stream_response(
369372
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
370373
)
371374
elif isinstance(event, QueueIterationNextEvent):
372375
if not workflow_run:
373-
raise Exception("Workflow run not initialized.")
376+
raise ValueError("workflow run not initialized.")
374377

375378
yield self._workflow_iteration_next_to_stream_response(
376379
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
377380
)
378381
elif isinstance(event, QueueIterationCompletedEvent):
379382
if not workflow_run:
380-
raise Exception("Workflow run not initialized.")
383+
raise ValueError("workflow run not initialized.")
381384

382385
yield self._workflow_iteration_completed_to_stream_response(
383386
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
384387
)
385388
elif isinstance(event, QueueWorkflowSucceededEvent):
386389
if not workflow_run:
387-
raise Exception("Workflow run not initialized.")
390+
raise ValueError("workflow run not initialized.")
388391

389392
if not graph_runtime_state:
390-
raise Exception("Graph runtime state not initialized.")
393+
raise ValueError("workflow run not initialized.")
391394

392395
workflow_run = self._handle_workflow_run_success(
393396
workflow_run=workflow_run,
@@ -406,10 +409,10 @@ def _process_stream_response(
406409
self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
407410
elif isinstance(event, QueueWorkflowPartialSuccessEvent):
408411
if not workflow_run:
409-
raise Exception("Workflow run not initialized.")
412+
raise ValueError("workflow run not initialized.")
410413

411414
if not graph_runtime_state:
412-
raise Exception("Graph runtime state not initialized.")
415+
raise ValueError("graph runtime state not initialized.")
413416

414417
workflow_run = self._handle_workflow_run_partial_success(
415418
workflow_run=workflow_run,
@@ -429,10 +432,10 @@ def _process_stream_response(
429432
self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
430433
elif isinstance(event, QueueWorkflowFailedEvent):
431434
if not workflow_run:
432-
raise Exception("Workflow run not initialized.")
435+
raise ValueError("workflow run not initialized.")
433436

434437
if not graph_runtime_state:
435-
raise Exception("Graph runtime state not initialized.")
438+
raise ValueError("graph runtime state not initialized.")
436439

437440
workflow_run = self._handle_workflow_run_failed(
438441
workflow_run=workflow_run,
@@ -522,7 +525,7 @@ def _process_stream_response(
522525
yield self._message_replace_to_stream_response(answer=event.text)
523526
elif isinstance(event, QueueAdvancedChatMessageEndEvent):
524527
if not graph_runtime_state:
525-
raise Exception("Graph runtime state not initialized.")
528+
raise ValueError("graph runtime state not initialized.")
526529

527530
output_moderation_answer = self._handle_output_moderation_when_task_finished(self._task_state.answer)
528531
if output_moderation_answer:

api/core/app/apps/workflow/generate_task_pipeline.py

+32-30
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ def _to_blocking_response(self, generator: Generator[StreamResponse, None, None]
155155
else:
156156
continue
157157

158-
raise Exception("Queue listening stopped unexpectedly.")
158+
raise ValueError("queue listening stopped unexpectedly.")
159159

160160
def _to_stream_response(
161161
self, generator: Generator[StreamResponse, None, None]
@@ -218,7 +218,7 @@ def _wrapper_process_stream_response(
218218
break
219219
else:
220220
yield MessageAudioStreamResponse(audio=audio_trunk.audio, task_id=task_id)
221-
except Exception as e:
221+
except Exception:
222222
logger.exception(f"Fails to get audio trunk, task_id: {task_id}")
223223
break
224224
if tts_publisher:
@@ -254,9 +254,27 @@ def _process_stream_response(
254254
yield self._workflow_start_to_stream_response(
255255
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
256256
)
257+
elif isinstance(
258+
event,
259+
QueueNodeRetryEvent,
260+
):
261+
if not workflow_run:
262+
raise ValueError("workflow run not initialized.")
263+
workflow_node_execution = self._handle_workflow_node_execution_retried(
264+
workflow_run=workflow_run, event=event
265+
)
266+
267+
response = self._workflow_node_retry_to_stream_response(
268+
event=event,
269+
task_id=self._application_generate_entity.task_id,
270+
workflow_node_execution=workflow_node_execution,
271+
)
272+
273+
if response:
274+
yield response
257275
elif isinstance(event, QueueNodeStartedEvent):
258276
if not workflow_run:
259-
raise Exception("Workflow run not initialized.")
277+
raise ValueError("workflow run not initialized.")
260278

261279
workflow_node_execution = self._handle_node_execution_start(workflow_run=workflow_run, event=event)
262280

@@ -289,64 +307,48 @@ def _process_stream_response(
289307
)
290308
if node_failed_response:
291309
yield node_failed_response
292-
elif isinstance(
293-
event,
294-
QueueNodeRetryEvent,
295-
):
296-
workflow_node_execution = self._handle_workflow_node_execution_retried(
297-
workflow_run=workflow_run, event=event
298-
)
299-
300-
response = self._workflow_node_retry_to_stream_response(
301-
event=event,
302-
task_id=self._application_generate_entity.task_id,
303-
workflow_node_execution=workflow_node_execution,
304-
)
305-
306-
if response:
307-
yield response
308310

309311
elif isinstance(event, QueueParallelBranchRunStartedEvent):
310312
if not workflow_run:
311-
raise Exception("Workflow run not initialized.")
313+
raise ValueError("workflow run not initialized.")
312314

313315
yield self._workflow_parallel_branch_start_to_stream_response(
314316
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
315317
)
316318
elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent):
317319
if not workflow_run:
318-
raise Exception("Workflow run not initialized.")
320+
raise ValueError("workflow run not initialized.")
319321

320322
yield self._workflow_parallel_branch_finished_to_stream_response(
321323
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
322324
)
323325
elif isinstance(event, QueueIterationStartEvent):
324326
if not workflow_run:
325-
raise Exception("Workflow run not initialized.")
327+
raise ValueError("workflow run not initialized.")
326328

327329
yield self._workflow_iteration_start_to_stream_response(
328330
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
329331
)
330332
elif isinstance(event, QueueIterationNextEvent):
331333
if not workflow_run:
332-
raise Exception("Workflow run not initialized.")
334+
raise ValueError("workflow run not initialized.")
333335

334336
yield self._workflow_iteration_next_to_stream_response(
335337
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
336338
)
337339
elif isinstance(event, QueueIterationCompletedEvent):
338340
if not workflow_run:
339-
raise Exception("Workflow run not initialized.")
341+
raise ValueError("workflow run not initialized.")
340342

341343
yield self._workflow_iteration_completed_to_stream_response(
342344
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
343345
)
344346
elif isinstance(event, QueueWorkflowSucceededEvent):
345347
if not workflow_run:
346-
raise Exception("Workflow run not initialized.")
348+
raise ValueError("workflow run not initialized.")
347349

348350
if not graph_runtime_state:
349-
raise Exception("Graph runtime state not initialized.")
351+
raise ValueError("graph runtime state not initialized.")
350352

351353
workflow_run = self._handle_workflow_run_success(
352354
workflow_run=workflow_run,
@@ -366,10 +368,10 @@ def _process_stream_response(
366368
)
367369
elif isinstance(event, QueueWorkflowPartialSuccessEvent):
368370
if not workflow_run:
369-
raise Exception("Workflow run not initialized.")
371+
raise ValueError("workflow run not initialized.")
370372

371373
if not graph_runtime_state:
372-
raise Exception("Graph runtime state not initialized.")
374+
raise ValueError("graph runtime state not initialized.")
373375

374376
workflow_run = self._handle_workflow_run_partial_success(
375377
workflow_run=workflow_run,
@@ -390,10 +392,10 @@ def _process_stream_response(
390392
)
391393
elif isinstance(event, QueueWorkflowFailedEvent | QueueStopEvent):
392394
if not workflow_run:
393-
raise Exception("Workflow run not initialized.")
395+
raise ValueError("workflow run not initialized.")
394396

395397
if not graph_runtime_state:
396-
raise Exception("Graph runtime state not initialized.")
398+
raise ValueError("graph runtime state not initialized.")
397399
workflow_run = self._handle_workflow_run_failed(
398400
workflow_run=workflow_run,
399401
start_at=graph_runtime_state.start_at,

0 commit comments

Comments
 (0)