feat: handle http node retry
This commit is contained in:
parent
853b9af09c
commit
357769c72e
@ -632,6 +632,13 @@ class GraphEngine:
|
||||
if isinstance(item, RunCompletedEvent):
|
||||
run_result = item.run_result
|
||||
if run_result.status == WorkflowNodeExecutionStatus.FAILED:
|
||||
if (
|
||||
retries == max_retries
|
||||
and node_instance.node_type == NodeType.HTTP_REQUEST
|
||||
and run_result.outputs
|
||||
and not node_instance.should_continue_on_error
|
||||
):
|
||||
run_result.status = WorkflowNodeExecutionStatus.SUCCEEDED
|
||||
if node_instance.should_retry and retries < max_retries:
|
||||
retries += 1
|
||||
yield NodeRunRetryEvent(
|
||||
@ -697,7 +704,7 @@ class GraphEngine:
|
||||
parent_parallel_id=parent_parallel_id,
|
||||
parent_parallel_start_node_id=parent_parallel_start_node_id,
|
||||
)
|
||||
|
||||
shoudl_continue_retry = False
|
||||
elif run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED:
|
||||
if node_instance.should_continue_on_error and self.graph.edge_mapping.get(
|
||||
node_instance.node_id
|
||||
|
@ -35,5 +35,4 @@ class FailBranchSourceHandle(StrEnum):
|
||||
|
||||
|
||||
CONTINUE_ON_ERROR_NODE_TYPE = [NodeType.LLM, NodeType.CODE, NodeType.TOOL, NodeType.HTTP_REQUEST]
|
||||
# TODO Remove code node
|
||||
RETRY_ON_ERROR_NODE_TYPE = [NodeType.LLM, NodeType.CODE, NodeType.TOOL, NodeType.HTTP_REQUEST]
|
||||
RETRY_ON_ERROR_NODE_TYPE = [NodeType.LLM, NodeType.TOOL, NodeType.HTTP_REQUEST]
|
||||
|
@ -51,6 +51,11 @@ class HttpRequestNode(BaseNode[HttpRequestNodeData]):
|
||||
"max_write_timeout": dify_config.HTTP_REQUEST_MAX_WRITE_TIMEOUT,
|
||||
},
|
||||
},
|
||||
"retry_config": {
|
||||
"max_retries": dify_config.SSRF_DEFAULT_MAX_RETRIES,
|
||||
"retry_interval": 0.5 * (2**2),
|
||||
"retry_enabled": True,
|
||||
},
|
||||
}
|
||||
|
||||
def _run(self) -> NodeRunResult:
|
||||
@ -61,8 +66,7 @@ class HttpRequestNode(BaseNode[HttpRequestNodeData]):
|
||||
"timeout": self._get_request_timeout(self.node_data),
|
||||
"variable_pool": self.graph_runtime_state.variable_pool,
|
||||
}
|
||||
if self.should_retry:
|
||||
executor_config["max_retries"] = 0
|
||||
executor_config["max_retries"] = 0
|
||||
http_executor = Executor(
|
||||
**executor_config,
|
||||
)
|
||||
@ -70,7 +74,7 @@ class HttpRequestNode(BaseNode[HttpRequestNodeData]):
|
||||
|
||||
response = http_executor.invoke()
|
||||
files = self.extract_files(url=http_executor.url, response=response)
|
||||
if not response.response.is_success and self.should_continue_on_error:
|
||||
if not response.response.is_success and (self.should_continue_on_error or self.should_retry):
|
||||
return NodeRunResult(
|
||||
status=WorkflowNodeExecutionStatus.FAILED,
|
||||
outputs={
|
||||
@ -82,7 +86,7 @@ class HttpRequestNode(BaseNode[HttpRequestNodeData]):
|
||||
process_data={
|
||||
"request": http_executor.to_log(),
|
||||
},
|
||||
error=f"Request failed with status code {response.status_code}",
|
||||
error=f"Request failed with status code {response.status_code}\nRaw response:{response.text}",
|
||||
error_type="HTTPResponseCodeError",
|
||||
)
|
||||
return NodeRunResult(
|
||||
|
@ -36,7 +36,10 @@ class ContinueOnErrorTestHelper:
|
||||
|
||||
@staticmethod
|
||||
def get_http_node(
|
||||
error_strategy: str = "fail-branch", default_value: dict | None = None, authorization_success: bool = False
|
||||
error_strategy: str = "fail-branch",
|
||||
default_value: dict | None = None,
|
||||
authorization_success: bool = False,
|
||||
retry_config: dict = {},
|
||||
):
|
||||
"""Helper method to create a http node configuration"""
|
||||
authorization = (
|
||||
@ -67,6 +70,7 @@ class ContinueOnErrorTestHelper:
|
||||
"body": None,
|
||||
"type": "http-request",
|
||||
"error_strategy": error_strategy,
|
||||
**retry_config,
|
||||
},
|
||||
}
|
||||
if default_value:
|
||||
|
@ -24,23 +24,15 @@ DEFAULT_VALUE_EDGE = [
|
||||
|
||||
def test_retry_default_value_partial_success():
|
||||
"""retry default value node with partial success status"""
|
||||
error_code = """
|
||||
def main() -> dict:
|
||||
return {
|
||||
"result": 1 / 0,
|
||||
}
|
||||
"""
|
||||
|
||||
graph_config = {
|
||||
"edges": DEFAULT_VALUE_EDGE,
|
||||
"nodes": [
|
||||
{"data": {"title": "start", "type": "start", "variables": []}, "id": "start"},
|
||||
{"data": {"title": "answer", "type": "answer", "answer": "{{#node.result#}}"}, "id": "answer"},
|
||||
ContinueOnErrorTestHelper.get_code_node(
|
||||
error_code,
|
||||
ContinueOnErrorTestHelper.get_http_node(
|
||||
"default-value",
|
||||
[{"key": "result", "type": "number", "value": 132123}],
|
||||
{"retry_config": {"max_retries": 2, "retry_interval": 1, "retry_enabled": True}},
|
||||
[{"key": "result", "type": "string", "value": "http node got error response"}],
|
||||
retry_config={"retry_config": {"max_retries": 2, "retry_interval": 1000, "retry_enabled": True}},
|
||||
),
|
||||
],
|
||||
}
|
||||
@ -48,43 +40,11 @@ def test_retry_default_value_partial_success():
|
||||
graph_engine = ContinueOnErrorTestHelper.create_test_graph_engine(graph_config)
|
||||
events = list(graph_engine.run())
|
||||
assert sum(1 for e in events if isinstance(e, NodeRunRetryEvent)) == 2
|
||||
assert events[-1].outputs == {"answer": "132123"}
|
||||
assert events[-1].outputs == {"answer": "http node got error response"}
|
||||
assert any(isinstance(e, GraphRunPartialSucceededEvent) for e in events)
|
||||
assert len(events) == 11
|
||||
|
||||
|
||||
def test_retry_success():
|
||||
"""retry node with success status"""
|
||||
success_code = """
|
||||
count = 0
|
||||
def main():
|
||||
global count
|
||||
count += 1
|
||||
if count == 1:
|
||||
raise Exception("First attempt fails")
|
||||
if count == 2:
|
||||
return {"result": "success"}
|
||||
"""
|
||||
graph_config = {
|
||||
"edges": DEFAULT_VALUE_EDGE,
|
||||
"nodes": [
|
||||
{"data": {"title": "start", "type": "start", "variables": []}, "id": "start"},
|
||||
{"data": {"title": "answer", "type": "answer", "answer": "{{#node.result#}}"}, "id": "answer"},
|
||||
ContinueOnErrorTestHelper.get_code_node(
|
||||
success_code,
|
||||
None,
|
||||
None,
|
||||
{"retry_config": {"max_retries": 2, "retry_interval": 1000, "retry_enabled": True}},
|
||||
),
|
||||
],
|
||||
}
|
||||
graph_engine = ContinueOnErrorTestHelper.create_test_graph_engine(graph_config)
|
||||
events = list(graph_engine.run())
|
||||
assert sum(1 for e in events if isinstance(e, NodeRunRetryEvent)) == 2
|
||||
assert any(isinstance(e, GraphRunSucceededEvent) for e in events)
|
||||
assert len(events) == 9
|
||||
|
||||
|
||||
def test_retry_failed():
|
||||
"""retry failed with success status"""
|
||||
error_code = """
|
||||
@ -99,11 +59,10 @@ def test_retry_failed():
|
||||
"nodes": [
|
||||
{"data": {"title": "start", "type": "start", "variables": []}, "id": "start"},
|
||||
{"data": {"title": "answer", "type": "answer", "answer": "{{#node.result#}}"}, "id": "answer"},
|
||||
ContinueOnErrorTestHelper.get_code_node(
|
||||
error_code,
|
||||
ContinueOnErrorTestHelper.get_http_node(
|
||||
None,
|
||||
None,
|
||||
{"retry_config": {"max_retries": 2, "retry_interval": 1, "retry_enabled": True}},
|
||||
retry_config={"retry_config": {"max_retries": 2, "retry_interval": 1000, "retry_enabled": True}},
|
||||
),
|
||||
],
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user