From 357769c72ea0bddb76ea50ac51bfaf5d761cf567 Mon Sep 17 00:00:00 2001 From: Novice Lee Date: Wed, 18 Dec 2024 15:30:14 +0800 Subject: [PATCH] feat: handle http node retry --- .../workflow/graph_engine/graph_engine.py | 9 +++- api/core/workflow/nodes/enums.py | 3 +- api/core/workflow/nodes/http_request/node.py | 12 +++-- .../workflow/nodes/test_continue_on_error.py | 6 ++- .../core/workflow/nodes/test_retry.py | 53 +++---------------- 5 files changed, 28 insertions(+), 55 deletions(-) diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index 5d400cd438..78147f308e 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -632,6 +632,13 @@ class GraphEngine: if isinstance(item, RunCompletedEvent): run_result = item.run_result if run_result.status == WorkflowNodeExecutionStatus.FAILED: + if ( + retries == max_retries + and node_instance.node_type == NodeType.HTTP_REQUEST + and run_result.outputs + and not node_instance.should_continue_on_error + ): + run_result.status = WorkflowNodeExecutionStatus.SUCCEEDED if node_instance.should_retry and retries < max_retries: retries += 1 yield NodeRunRetryEvent( @@ -697,7 +704,7 @@ class GraphEngine: parent_parallel_id=parent_parallel_id, parent_parallel_start_node_id=parent_parallel_start_node_id, ) - + shoudl_continue_retry = False elif run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED: if node_instance.should_continue_on_error and self.graph.edge_mapping.get( node_instance.node_id diff --git a/api/core/workflow/nodes/enums.py b/api/core/workflow/nodes/enums.py index e278a96eb9..32fdc048d1 100644 --- a/api/core/workflow/nodes/enums.py +++ b/api/core/workflow/nodes/enums.py @@ -35,5 +35,4 @@ class FailBranchSourceHandle(StrEnum): CONTINUE_ON_ERROR_NODE_TYPE = [NodeType.LLM, NodeType.CODE, NodeType.TOOL, NodeType.HTTP_REQUEST] -# TODO Remove code node -RETRY_ON_ERROR_NODE_TYPE = [NodeType.LLM, NodeType.CODE, NodeType.TOOL, NodeType.HTTP_REQUEST] +RETRY_ON_ERROR_NODE_TYPE = [NodeType.LLM, NodeType.TOOL, NodeType.HTTP_REQUEST] diff --git a/api/core/workflow/nodes/http_request/node.py b/api/core/workflow/nodes/http_request/node.py index 92ddfcf208..13d107baf9 100644 --- a/api/core/workflow/nodes/http_request/node.py +++ b/api/core/workflow/nodes/http_request/node.py @@ -51,6 +51,11 @@ class HttpRequestNode(BaseNode[HttpRequestNodeData]): "max_write_timeout": dify_config.HTTP_REQUEST_MAX_WRITE_TIMEOUT, }, }, + "retry_config": { + "max_retries": dify_config.SSRF_DEFAULT_MAX_RETRIES, + "retry_interval": 0.5 * (2**2), + "retry_enabled": True, + }, } def _run(self) -> NodeRunResult: @@ -61,8 +66,7 @@ class HttpRequestNode(BaseNode[HttpRequestNodeData]): "timeout": self._get_request_timeout(self.node_data), "variable_pool": self.graph_runtime_state.variable_pool, } - if self.should_retry: - executor_config["max_retries"] = 0 + executor_config["max_retries"] = 0 http_executor = Executor( **executor_config, ) @@ -70,7 +74,7 @@ class HttpRequestNode(BaseNode[HttpRequestNodeData]): response = http_executor.invoke() files = self.extract_files(url=http_executor.url, response=response) - if not response.response.is_success and self.should_continue_on_error: + if not response.response.is_success and (self.should_continue_on_error or self.should_retry): return NodeRunResult( status=WorkflowNodeExecutionStatus.FAILED, outputs={ @@ -82,7 +86,7 @@ class HttpRequestNode(BaseNode[HttpRequestNodeData]): process_data={ "request": http_executor.to_log(), }, - error=f"Request failed with status code {response.status_code}", + error=f"Request failed with status code {response.status_code}\nRaw response:{response.text}", error_type="HTTPResponseCodeError", ) return NodeRunResult( diff --git a/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py b/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py index 4feae5caf5..2d74be9da9 100644 --- a/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py +++ b/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py @@ -36,7 +36,10 @@ class ContinueOnErrorTestHelper: @staticmethod def get_http_node( - error_strategy: str = "fail-branch", default_value: dict | None = None, authorization_success: bool = False + error_strategy: str = "fail-branch", + default_value: dict | None = None, + authorization_success: bool = False, + retry_config: dict = {}, ): """Helper method to create a http node configuration""" authorization = ( @@ -67,6 +70,7 @@ class ContinueOnErrorTestHelper: "body": None, "type": "http-request", "error_strategy": error_strategy, + **retry_config, }, } if default_value: diff --git a/api/tests/unit_tests/core/workflow/nodes/test_retry.py b/api/tests/unit_tests/core/workflow/nodes/test_retry.py index b7005b0c57..c232875ce5 100644 --- a/api/tests/unit_tests/core/workflow/nodes/test_retry.py +++ b/api/tests/unit_tests/core/workflow/nodes/test_retry.py @@ -24,23 +24,15 @@ DEFAULT_VALUE_EDGE = [ def test_retry_default_value_partial_success(): """retry default value node with partial success status""" - error_code = """ - def main() -> dict: - return { - "result": 1 / 0, - } - """ - graph_config = { "edges": DEFAULT_VALUE_EDGE, "nodes": [ {"data": {"title": "start", "type": "start", "variables": []}, "id": "start"}, {"data": {"title": "answer", "type": "answer", "answer": "{{#node.result#}}"}, "id": "answer"}, - ContinueOnErrorTestHelper.get_code_node( - error_code, + ContinueOnErrorTestHelper.get_http_node( "default-value", - [{"key": "result", "type": "number", "value": 132123}], - {"retry_config": {"max_retries": 2, "retry_interval": 1, "retry_enabled": True}}, + [{"key": "result", "type": "string", "value": "http node got error response"}], + retry_config={"retry_config": {"max_retries": 2, "retry_interval": 1000, "retry_enabled": True}}, ), ], } @@ -48,43 +40,11 @@ def test_retry_default_value_partial_success(): graph_engine = ContinueOnErrorTestHelper.create_test_graph_engine(graph_config) events = list(graph_engine.run()) assert sum(1 for e in events if isinstance(e, NodeRunRetryEvent)) == 2 - assert events[-1].outputs == {"answer": "132123"} + assert events[-1].outputs == {"answer": "http node got error response"} assert any(isinstance(e, GraphRunPartialSucceededEvent) for e in events) assert len(events) == 11 -def test_retry_success(): - """retry node with success status""" - success_code = """ - count = 0 - def main(): - global count - count += 1 - if count == 1: - raise Exception("First attempt fails") - if count == 2: - return {"result": "success"} - """ - graph_config = { - "edges": DEFAULT_VALUE_EDGE, - "nodes": [ - {"data": {"title": "start", "type": "start", "variables": []}, "id": "start"}, - {"data": {"title": "answer", "type": "answer", "answer": "{{#node.result#}}"}, "id": "answer"}, - ContinueOnErrorTestHelper.get_code_node( - success_code, - None, - None, - {"retry_config": {"max_retries": 2, "retry_interval": 1000, "retry_enabled": True}}, - ), - ], - } - graph_engine = ContinueOnErrorTestHelper.create_test_graph_engine(graph_config) - events = list(graph_engine.run()) - assert sum(1 for e in events if isinstance(e, NodeRunRetryEvent)) == 2 - assert any(isinstance(e, GraphRunSucceededEvent) for e in events) - assert len(events) == 9 - - def test_retry_failed(): """retry failed with success status""" error_code = """ @@ -99,11 +59,10 @@ def test_retry_failed(): "nodes": [ {"data": {"title": "start", "type": "start", "variables": []}, "id": "start"}, {"data": {"title": "answer", "type": "answer", "answer": "{{#node.result#}}"}, "id": "answer"}, - ContinueOnErrorTestHelper.get_code_node( - error_code, + ContinueOnErrorTestHelper.get_http_node( None, None, - {"retry_config": {"max_retries": 2, "retry_interval": 1, "retry_enabled": True}}, + retry_config={"retry_config": {"max_retries": 2, "retry_interval": 1000, "retry_enabled": True}}, ), ], }