diff --git a/api/core/workflow/nodes/http_request/executor.py b/api/core/workflow/nodes/http_request/executor.py index bf28222de0..577f111ec6 100644 --- a/api/core/workflow/nodes/http_request/executor.py +++ b/api/core/workflow/nodes/http_request/executor.py @@ -188,7 +188,13 @@ class Executor: if file_variable is None: raise FileFetchError(f"cannot fetch file with selector {file_selector}") file = file_variable.value - self.content = file_manager.download(file) + try: + self.content = file_manager.download(file) + except Exception as e: + # 确保在发生异常时也能清理资源 + if hasattr(self.content, 'close'): + self.content.close() + raise e case "x-www-form-urlencoded": form_data = { self.variable_pool.convert_template(item.key).text: self.variable_pool.convert_template( @@ -266,41 +272,34 @@ class Executor: return headers def _validate_and_parse_response(self, response: httpx.Response) -> Response: - executor_response = Response(response) + try: + executor_response = Response(response) - threshold_size = ( - dify_config.HTTP_REQUEST_NODE_MAX_BINARY_SIZE - if executor_response.is_file - else dify_config.HTTP_REQUEST_NODE_MAX_TEXT_SIZE - ) - if executor_response.size > threshold_size: - raise ResponseSizeError( - f"{'File' if executor_response.is_file else 'Text'} size is too large," - f" max size is {threshold_size / 1024 / 1024:.2f} MB," - f" but current size is {executor_response.readable_size}." + threshold_size = ( + dify_config.HTTP_REQUEST_NODE_MAX_BINARY_SIZE + if executor_response.is_file + else dify_config.HTTP_REQUEST_NODE_MAX_TEXT_SIZE ) + if executor_response.size > threshold_size: + raise ResponseSizeError( + f"{'File' if executor_response.is_file else 'Text'} size is too large," + f" max size is {threshold_size / 1024 / 1024:.2f} MB," + f" but current size is {executor_response.readable_size}." + ) - return executor_response + return executor_response + except Exception as e: + # 确保在发生异常时关闭响应 + response.close() + raise e def _do_http_request(self, headers: dict[str, Any]) -> httpx.Response: """ do http request depending on api bundle """ if self.method not in { - "get", - "head", - "post", - "put", - "delete", - "patch", - "options", - "GET", - "POST", - "PUT", - "PATCH", - "DELETE", - "HEAD", - "OPTIONS", + "get", "head", "post", "put", "delete", "patch", "options", + "GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS", }: raise InvalidHttpMethodError(f"Invalid http method {self.method}") @@ -316,21 +315,29 @@ class Executor: "follow_redirects": True, "max_retries": self.max_retries, } - # request_args = {k: v for k, v in request_args.items() if v is not None} - try: - response = getattr(ssrf_proxy, self.method.lower())(**request_args) - except (ssrf_proxy.MaxRetriesExceededError, httpx.RequestError) as e: - raise HttpRequestNodeError(str(e)) - # FIXME: fix type ignore, this maybe httpx type issue - return response # type: ignore + + # 使用 with 语句来确保资源正确释放 + with httpx.Client() as client: + try: + response = getattr(client, self.method.lower())(**request_args) + # 创建一个新的 Response 对象并复制需要的数据 + # 这样可以安全地关闭原始响应 + copied_response = response.copy() + response.close() + return copied_response + except (ssrf_proxy.MaxRetriesExceededError, httpx.RequestError) as e: + raise HttpRequestNodeError(str(e)) def invoke(self) -> Response: - # assemble headers - headers = self._assembling_headers() - # do http request - response = self._do_http_request(headers) - # validate response - return self._validate_and_parse_response(response) + response = None + try: + headers = self._assembling_headers() + response = self._do_http_request(headers) + return self._validate_and_parse_response(response) + except Exception as e: + if response is not None: + response.close() + raise e def to_log(self): url_parts = urlparse(self.url)