fix: replace Exception with ValueError for uninitialized workflow run and graph runtime state
Signed-off-by: -LAN- <laipz8200@outlook.com>
This commit is contained in:
parent
d4ddcda3f2
commit
cdc854d15b
@ -218,7 +218,7 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
|
|||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
yield MessageAudioStreamResponse(audio=audio_trunk.audio, task_id=task_id)
|
yield MessageAudioStreamResponse(audio=audio_trunk.audio, task_id=task_id)
|
||||||
except Exception as e:
|
except Exception:
|
||||||
logger.exception(f"Fails to get audio trunk, task_id: {task_id}")
|
logger.exception(f"Fails to get audio trunk, task_id: {task_id}")
|
||||||
break
|
break
|
||||||
if tts_publisher:
|
if tts_publisher:
|
||||||
@ -258,6 +258,8 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
|
|||||||
event,
|
event,
|
||||||
QueueNodeRetryEvent,
|
QueueNodeRetryEvent,
|
||||||
):
|
):
|
||||||
|
if not workflow_run:
|
||||||
|
raise ValueError("workflow run not initialized.")
|
||||||
workflow_node_execution = self._handle_workflow_node_execution_retried(
|
workflow_node_execution = self._handle_workflow_node_execution_retried(
|
||||||
workflow_run=workflow_run, event=event
|
workflow_run=workflow_run, event=event
|
||||||
)
|
)
|
||||||
@ -272,7 +274,7 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
|
|||||||
yield response
|
yield response
|
||||||
elif isinstance(event, QueueNodeStartedEvent):
|
elif isinstance(event, QueueNodeStartedEvent):
|
||||||
if not workflow_run:
|
if not workflow_run:
|
||||||
raise Exception("Workflow run not initialized.")
|
raise ValueError("workflow run not initialized.")
|
||||||
|
|
||||||
workflow_node_execution = self._handle_node_execution_start(workflow_run=workflow_run, event=event)
|
workflow_node_execution = self._handle_node_execution_start(workflow_run=workflow_run, event=event)
|
||||||
|
|
||||||
@ -308,45 +310,45 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
|
|||||||
|
|
||||||
elif isinstance(event, QueueParallelBranchRunStartedEvent):
|
elif isinstance(event, QueueParallelBranchRunStartedEvent):
|
||||||
if not workflow_run:
|
if not workflow_run:
|
||||||
raise Exception("Workflow run not initialized.")
|
raise ValueError("workflow run not initialized.")
|
||||||
|
|
||||||
yield self._workflow_parallel_branch_start_to_stream_response(
|
yield self._workflow_parallel_branch_start_to_stream_response(
|
||||||
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
|
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
|
||||||
)
|
)
|
||||||
elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent):
|
elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent):
|
||||||
if not workflow_run:
|
if not workflow_run:
|
||||||
raise Exception("Workflow run not initialized.")
|
raise ValueError("workflow run not initialized.")
|
||||||
|
|
||||||
yield self._workflow_parallel_branch_finished_to_stream_response(
|
yield self._workflow_parallel_branch_finished_to_stream_response(
|
||||||
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
|
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
|
||||||
)
|
)
|
||||||
elif isinstance(event, QueueIterationStartEvent):
|
elif isinstance(event, QueueIterationStartEvent):
|
||||||
if not workflow_run:
|
if not workflow_run:
|
||||||
raise Exception("Workflow run not initialized.")
|
raise ValueError("workflow run not initialized.")
|
||||||
|
|
||||||
yield self._workflow_iteration_start_to_stream_response(
|
yield self._workflow_iteration_start_to_stream_response(
|
||||||
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
|
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
|
||||||
)
|
)
|
||||||
elif isinstance(event, QueueIterationNextEvent):
|
elif isinstance(event, QueueIterationNextEvent):
|
||||||
if not workflow_run:
|
if not workflow_run:
|
||||||
raise Exception("Workflow run not initialized.")
|
raise ValueError("workflow run not initialized.")
|
||||||
|
|
||||||
yield self._workflow_iteration_next_to_stream_response(
|
yield self._workflow_iteration_next_to_stream_response(
|
||||||
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
|
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
|
||||||
)
|
)
|
||||||
elif isinstance(event, QueueIterationCompletedEvent):
|
elif isinstance(event, QueueIterationCompletedEvent):
|
||||||
if not workflow_run:
|
if not workflow_run:
|
||||||
raise Exception("Workflow run not initialized.")
|
raise ValueError("workflow run not initialized.")
|
||||||
|
|
||||||
yield self._workflow_iteration_completed_to_stream_response(
|
yield self._workflow_iteration_completed_to_stream_response(
|
||||||
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
|
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
|
||||||
)
|
)
|
||||||
elif isinstance(event, QueueWorkflowSucceededEvent):
|
elif isinstance(event, QueueWorkflowSucceededEvent):
|
||||||
if not workflow_run:
|
if not workflow_run:
|
||||||
raise Exception("Workflow run not initialized.")
|
raise ValueError("workflow run not initialized.")
|
||||||
|
|
||||||
if not graph_runtime_state:
|
if not graph_runtime_state:
|
||||||
raise Exception("Graph runtime state not initialized.")
|
raise ValueError("graph runtime state not initialized.")
|
||||||
|
|
||||||
workflow_run = self._handle_workflow_run_success(
|
workflow_run = self._handle_workflow_run_success(
|
||||||
workflow_run=workflow_run,
|
workflow_run=workflow_run,
|
||||||
@ -366,10 +368,10 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
|
|||||||
)
|
)
|
||||||
elif isinstance(event, QueueWorkflowPartialSuccessEvent):
|
elif isinstance(event, QueueWorkflowPartialSuccessEvent):
|
||||||
if not workflow_run:
|
if not workflow_run:
|
||||||
raise Exception("Workflow run not initialized.")
|
raise ValueError("workflow run not initialized.")
|
||||||
|
|
||||||
if not graph_runtime_state:
|
if not graph_runtime_state:
|
||||||
raise Exception("Graph runtime state not initialized.")
|
raise ValueError("graph runtime state not initialized.")
|
||||||
|
|
||||||
workflow_run = self._handle_workflow_run_partial_success(
|
workflow_run = self._handle_workflow_run_partial_success(
|
||||||
workflow_run=workflow_run,
|
workflow_run=workflow_run,
|
||||||
@ -390,10 +392,10 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
|
|||||||
)
|
)
|
||||||
elif isinstance(event, QueueWorkflowFailedEvent | QueueStopEvent):
|
elif isinstance(event, QueueWorkflowFailedEvent | QueueStopEvent):
|
||||||
if not workflow_run:
|
if not workflow_run:
|
||||||
raise Exception("Workflow run not initialized.")
|
raise ValueError("workflow run not initialized.")
|
||||||
|
|
||||||
if not graph_runtime_state:
|
if not graph_runtime_state:
|
||||||
raise Exception("Graph runtime state not initialized.")
|
raise ValueError("graph runtime state not initialized.")
|
||||||
workflow_run = self._handle_workflow_run_failed(
|
workflow_run = self._handle_workflow_run_failed(
|
||||||
workflow_run=workflow_run,
|
workflow_run=workflow_run,
|
||||||
start_at=graph_runtime_state.start_at,
|
start_at=graph_runtime_state.start_at,
|
||||||
|
Loading…
Reference in New Issue
Block a user