From fc15b95b81ef79ddb9128b23d90d12483777abe6 Mon Sep 17 00:00:00 2001 From: -LAN- Date: Mon, 23 Dec 2024 12:14:28 +0800 Subject: [PATCH] fix: replace Exception with ValueError for uninitialized workflow run and queue listening errors Signed-off-by: -LAN- --- .../advanced_chat/generate_task_pipeline.py | 30 ++++++++++--------- .../apps/workflow/generate_task_pipeline.py | 2 +- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index c7bf37dd08..635e482ad9 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -180,7 +180,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc else: continue - raise Exception("Queue listening stopped unexpectedly.") + raise ValueError("queue listening stopped unexpectedly.") def _to_stream_response( self, generator: Generator[StreamResponse, None, None] @@ -295,6 +295,8 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc event, QueueNodeRetryEvent, ): + if not workflow_run: + raise ValueError("workflow run not initialized.") workflow_node_execution = self._handle_workflow_node_execution_retried( workflow_run=workflow_run, event=event ) @@ -309,7 +311,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc yield response elif isinstance(event, QueueNodeStartedEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") workflow_node_execution = self._handle_node_execution_start(workflow_run=workflow_run, event=event) @@ -350,45 +352,45 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc elif isinstance(event, QueueParallelBranchRunStartedEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") yield self._workflow_parallel_branch_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event ) elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") yield self._workflow_parallel_branch_finished_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event ) elif isinstance(event, QueueIterationStartEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") yield self._workflow_iteration_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event ) elif isinstance(event, QueueIterationNextEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") yield self._workflow_iteration_next_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event ) elif isinstance(event, QueueIterationCompletedEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") yield self._workflow_iteration_completed_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event ) elif isinstance(event, QueueWorkflowSucceededEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") if not graph_runtime_state: - raise Exception("Graph runtime state not initialized.") + raise ValueError("workflow run not initialized.") workflow_run = self._handle_workflow_run_success( workflow_run=workflow_run, @@ -407,10 +409,10 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE) elif isinstance(event, QueueWorkflowPartialSuccessEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") if not graph_runtime_state: - raise Exception("Graph runtime state not initialized.") + raise ValueError("graph runtime state not initialized.") workflow_run = self._handle_workflow_run_partial_success( workflow_run=workflow_run, @@ -430,10 +432,10 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE) elif isinstance(event, QueueWorkflowFailedEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") if not graph_runtime_state: - raise Exception("Graph runtime state not initialized.") + raise ValueError("graph runtime state not initialized.") workflow_run = self._handle_workflow_run_failed( workflow_run=workflow_run, @@ -523,7 +525,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc yield self._message_replace_to_stream_response(answer=event.text) elif isinstance(event, QueueAdvancedChatMessageEndEvent): if not graph_runtime_state: - raise Exception("Graph runtime state not initialized.") + raise ValueError("graph runtime state not initialized.") output_moderation_answer = self._handle_output_moderation_when_task_finished(self._task_state.answer) if output_moderation_answer: diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index bd63c4ec7a..c47b38f560 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -155,7 +155,7 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa else: continue - raise Exception("Queue listening stopped unexpectedly.") + raise ValueError("queue listening stopped unexpectedly.") def _to_stream_response( self, generator: Generator[StreamResponse, None, None]