fix: properly handle resource cleanup in http request executor
This commit is contained in:
parent
683b4ac615
commit
984499ee96
@ -188,7 +188,13 @@ class Executor:
|
|||||||
if file_variable is None:
|
if file_variable is None:
|
||||||
raise FileFetchError(f"cannot fetch file with selector {file_selector}")
|
raise FileFetchError(f"cannot fetch file with selector {file_selector}")
|
||||||
file = file_variable.value
|
file = file_variable.value
|
||||||
self.content = file_manager.download(file)
|
try:
|
||||||
|
self.content = file_manager.download(file)
|
||||||
|
except Exception as e:
|
||||||
|
# 确保在发生异常时也能清理资源
|
||||||
|
if hasattr(self.content, 'close'):
|
||||||
|
self.content.close()
|
||||||
|
raise e
|
||||||
case "x-www-form-urlencoded":
|
case "x-www-form-urlencoded":
|
||||||
form_data = {
|
form_data = {
|
||||||
self.variable_pool.convert_template(item.key).text: self.variable_pool.convert_template(
|
self.variable_pool.convert_template(item.key).text: self.variable_pool.convert_template(
|
||||||
@ -266,41 +272,34 @@ class Executor:
|
|||||||
return headers
|
return headers
|
||||||
|
|
||||||
def _validate_and_parse_response(self, response: httpx.Response) -> Response:
|
def _validate_and_parse_response(self, response: httpx.Response) -> Response:
|
||||||
executor_response = Response(response)
|
try:
|
||||||
|
executor_response = Response(response)
|
||||||
|
|
||||||
threshold_size = (
|
threshold_size = (
|
||||||
dify_config.HTTP_REQUEST_NODE_MAX_BINARY_SIZE
|
dify_config.HTTP_REQUEST_NODE_MAX_BINARY_SIZE
|
||||||
if executor_response.is_file
|
if executor_response.is_file
|
||||||
else dify_config.HTTP_REQUEST_NODE_MAX_TEXT_SIZE
|
else dify_config.HTTP_REQUEST_NODE_MAX_TEXT_SIZE
|
||||||
)
|
|
||||||
if executor_response.size > threshold_size:
|
|
||||||
raise ResponseSizeError(
|
|
||||||
f"{'File' if executor_response.is_file else 'Text'} size is too large,"
|
|
||||||
f" max size is {threshold_size / 1024 / 1024:.2f} MB,"
|
|
||||||
f" but current size is {executor_response.readable_size}."
|
|
||||||
)
|
)
|
||||||
|
if executor_response.size > threshold_size:
|
||||||
|
raise ResponseSizeError(
|
||||||
|
f"{'File' if executor_response.is_file else 'Text'} size is too large,"
|
||||||
|
f" max size is {threshold_size / 1024 / 1024:.2f} MB,"
|
||||||
|
f" but current size is {executor_response.readable_size}."
|
||||||
|
)
|
||||||
|
|
||||||
return executor_response
|
return executor_response
|
||||||
|
except Exception as e:
|
||||||
|
# 确保在发生异常时关闭响应
|
||||||
|
response.close()
|
||||||
|
raise e
|
||||||
|
|
||||||
def _do_http_request(self, headers: dict[str, Any]) -> httpx.Response:
|
def _do_http_request(self, headers: dict[str, Any]) -> httpx.Response:
|
||||||
"""
|
"""
|
||||||
do http request depending on api bundle
|
do http request depending on api bundle
|
||||||
"""
|
"""
|
||||||
if self.method not in {
|
if self.method not in {
|
||||||
"get",
|
"get", "head", "post", "put", "delete", "patch", "options",
|
||||||
"head",
|
"GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS",
|
||||||
"post",
|
|
||||||
"put",
|
|
||||||
"delete",
|
|
||||||
"patch",
|
|
||||||
"options",
|
|
||||||
"GET",
|
|
||||||
"POST",
|
|
||||||
"PUT",
|
|
||||||
"PATCH",
|
|
||||||
"DELETE",
|
|
||||||
"HEAD",
|
|
||||||
"OPTIONS",
|
|
||||||
}:
|
}:
|
||||||
raise InvalidHttpMethodError(f"Invalid http method {self.method}")
|
raise InvalidHttpMethodError(f"Invalid http method {self.method}")
|
||||||
|
|
||||||
@ -316,21 +315,29 @@ class Executor:
|
|||||||
"follow_redirects": True,
|
"follow_redirects": True,
|
||||||
"max_retries": self.max_retries,
|
"max_retries": self.max_retries,
|
||||||
}
|
}
|
||||||
# request_args = {k: v for k, v in request_args.items() if v is not None}
|
|
||||||
try:
|
# 使用 with 语句来确保资源正确释放
|
||||||
response = getattr(ssrf_proxy, self.method.lower())(**request_args)
|
with httpx.Client() as client:
|
||||||
except (ssrf_proxy.MaxRetriesExceededError, httpx.RequestError) as e:
|
try:
|
||||||
raise HttpRequestNodeError(str(e))
|
response = getattr(client, self.method.lower())(**request_args)
|
||||||
# FIXME: fix type ignore, this maybe httpx type issue
|
# 创建一个新的 Response 对象并复制需要的数据
|
||||||
return response # type: ignore
|
# 这样可以安全地关闭原始响应
|
||||||
|
copied_response = response.copy()
|
||||||
|
response.close()
|
||||||
|
return copied_response
|
||||||
|
except (ssrf_proxy.MaxRetriesExceededError, httpx.RequestError) as e:
|
||||||
|
raise HttpRequestNodeError(str(e))
|
||||||
|
|
||||||
def invoke(self) -> Response:
|
def invoke(self) -> Response:
|
||||||
# assemble headers
|
response = None
|
||||||
headers = self._assembling_headers()
|
try:
|
||||||
# do http request
|
headers = self._assembling_headers()
|
||||||
response = self._do_http_request(headers)
|
response = self._do_http_request(headers)
|
||||||
# validate response
|
return self._validate_and_parse_response(response)
|
||||||
return self._validate_and_parse_response(response)
|
except Exception as e:
|
||||||
|
if response is not None:
|
||||||
|
response.close()
|
||||||
|
raise e
|
||||||
|
|
||||||
def to_log(self):
|
def to_log(self):
|
||||||
url_parts = urlparse(self.url)
|
url_parts = urlparse(self.url)
|
||||||
|
Loading…
Reference in New Issue
Block a user