From 90d1226a8fc626e55428c060020341ed7f4a1515 Mon Sep 17 00:00:00 2001 From: zhouyy Date: Wed, 12 Mar 2025 18:00:16 +0800 Subject: [PATCH 1/7] fix: properly close pdf reader resources in pdf_extractor --- api/core/rag/extractor/pdf_extractor.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/api/core/rag/extractor/pdf_extractor.py b/api/core/rag/extractor/pdf_extractor.py index 04033dec3f..13c8f57874 100644 --- a/api/core/rag/extractor/pdf_extractor.py +++ b/api/core/rag/extractor/pdf_extractor.py @@ -55,14 +55,18 @@ class PdfExtractor(BaseExtractor): import pypdfium2 # type: ignore with blob.as_bytes_io() as file_path: - pdf_reader = pypdfium2.PdfDocument(file_path, autoclose=True) try: + pdf_reader = pypdfium2.PdfDocument(file_path, autoclose=True) for page_number, page in enumerate(pdf_reader): - text_page = page.get_textpage() - content = text_page.get_text_range() - text_page.close() - page.close() - metadata = {"source": blob.source, "page": page_number} - yield Document(page_content=content, metadata=metadata) + try: + text_page = page.get_textpage() + try: + content = text_page.get_text_range() + metadata = {"source": blob.source, "page": page_number} + yield Document(page_content=content, metadata=metadata) + finally: + text_page.close() + finally: + page.close() finally: pdf_reader.close() From b23c27b8451d81ad305e455ffe617de1d2a3f618 Mon Sep 17 00:00:00 2001 From: zhouyy Date: Wed, 12 Mar 2025 18:11:49 +0800 Subject: [PATCH 2/7] fix: properly handle resource cleanup in http request executor --- .../workflow/nodes/http_request/executor.py | 87 ++++++++++--------- 1 file changed, 47 insertions(+), 40 deletions(-) diff --git a/api/core/workflow/nodes/http_request/executor.py b/api/core/workflow/nodes/http_request/executor.py index 5ed2cd6164..d0f5ee645d 100644 --- a/api/core/workflow/nodes/http_request/executor.py +++ b/api/core/workflow/nodes/http_request/executor.py @@ -187,7 +187,13 @@ class Executor: if file_variable is None: raise FileFetchError(f"cannot fetch file with selector {file_selector}") file = file_variable.value - self.content = file_manager.download(file) + try: + self.content = file_manager.download(file) + except Exception as e: + # 确保在发生异常时也能清理资源 + if hasattr(self.content, 'close'): + self.content.close() + raise e case "x-www-form-urlencoded": form_data = { self.variable_pool.convert_template(item.key).text: self.variable_pool.convert_template( @@ -244,41 +250,34 @@ class Executor: return headers def _validate_and_parse_response(self, response: httpx.Response) -> Response: - executor_response = Response(response) + try: + executor_response = Response(response) - threshold_size = ( - dify_config.HTTP_REQUEST_NODE_MAX_BINARY_SIZE - if executor_response.is_file - else dify_config.HTTP_REQUEST_NODE_MAX_TEXT_SIZE - ) - if executor_response.size > threshold_size: - raise ResponseSizeError( - f"{'File' if executor_response.is_file else 'Text'} size is too large," - f" max size is {threshold_size / 1024 / 1024:.2f} MB," - f" but current size is {executor_response.readable_size}." + threshold_size = ( + dify_config.HTTP_REQUEST_NODE_MAX_BINARY_SIZE + if executor_response.is_file + else dify_config.HTTP_REQUEST_NODE_MAX_TEXT_SIZE ) + if executor_response.size > threshold_size: + raise ResponseSizeError( + f"{'File' if executor_response.is_file else 'Text'} size is too large," + f" max size is {threshold_size / 1024 / 1024:.2f} MB," + f" but current size is {executor_response.readable_size}." + ) - return executor_response + return executor_response + except Exception as e: + # 确保在发生异常时关闭响应 + response.close() + raise e def _do_http_request(self, headers: dict[str, Any]) -> httpx.Response: """ do http request depending on api bundle """ if self.method not in { - "get", - "head", - "post", - "put", - "delete", - "patch", - "options", - "GET", - "POST", - "PUT", - "PATCH", - "DELETE", - "HEAD", - "OPTIONS", + "get", "head", "post", "put", "delete", "patch", "options", + "GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS", }: raise InvalidHttpMethodError(f"Invalid http method {self.method}") @@ -294,21 +293,29 @@ class Executor: "follow_redirects": True, "max_retries": self.max_retries, } - # request_args = {k: v for k, v in request_args.items() if v is not None} - try: - response = getattr(ssrf_proxy, self.method.lower())(**request_args) - except (ssrf_proxy.MaxRetriesExceededError, httpx.RequestError) as e: - raise HttpRequestNodeError(str(e)) - # FIXME: fix type ignore, this maybe httpx type issue - return response # type: ignore + + # 使用 with 语句来确保资源正确释放 + with httpx.Client() as client: + try: + response = getattr(client, self.method.lower())(**request_args) + # 创建一个新的 Response 对象并复制需要的数据 + # 这样可以安全地关闭原始响应 + copied_response = response.copy() + response.close() + return copied_response + except (ssrf_proxy.MaxRetriesExceededError, httpx.RequestError) as e: + raise HttpRequestNodeError(str(e)) def invoke(self) -> Response: - # assemble headers - headers = self._assembling_headers() - # do http request - response = self._do_http_request(headers) - # validate response - return self._validate_and_parse_response(response) + response = None + try: + headers = self._assembling_headers() + response = self._do_http_request(headers) + return self._validate_and_parse_response(response) + except Exception as e: + if response is not None: + response.close() + raise e def to_log(self): url_parts = urlparse(self.url) From 683b4ac61537abf78ddbe08460200318fb1842d7 Mon Sep 17 00:00:00 2001 From: zhouyy Date: Wed, 12 Mar 2025 18:00:16 +0800 Subject: [PATCH 3/7] fix: properly close pdf reader resources in pdf_extractor --- api/core/rag/extractor/pdf_extractor.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/api/core/rag/extractor/pdf_extractor.py b/api/core/rag/extractor/pdf_extractor.py index 04033dec3f..13c8f57874 100644 --- a/api/core/rag/extractor/pdf_extractor.py +++ b/api/core/rag/extractor/pdf_extractor.py @@ -55,14 +55,18 @@ class PdfExtractor(BaseExtractor): import pypdfium2 # type: ignore with blob.as_bytes_io() as file_path: - pdf_reader = pypdfium2.PdfDocument(file_path, autoclose=True) try: + pdf_reader = pypdfium2.PdfDocument(file_path, autoclose=True) for page_number, page in enumerate(pdf_reader): - text_page = page.get_textpage() - content = text_page.get_text_range() - text_page.close() - page.close() - metadata = {"source": blob.source, "page": page_number} - yield Document(page_content=content, metadata=metadata) + try: + text_page = page.get_textpage() + try: + content = text_page.get_text_range() + metadata = {"source": blob.source, "page": page_number} + yield Document(page_content=content, metadata=metadata) + finally: + text_page.close() + finally: + page.close() finally: pdf_reader.close() From 984499ee965af05b4e449024f6c2f81f0c6fb68d Mon Sep 17 00:00:00 2001 From: zhouyy Date: Wed, 12 Mar 2025 18:11:49 +0800 Subject: [PATCH 4/7] fix: properly handle resource cleanup in http request executor --- .../workflow/nodes/http_request/executor.py | 87 ++++++++++--------- 1 file changed, 47 insertions(+), 40 deletions(-) diff --git a/api/core/workflow/nodes/http_request/executor.py b/api/core/workflow/nodes/http_request/executor.py index bf28222de0..577f111ec6 100644 --- a/api/core/workflow/nodes/http_request/executor.py +++ b/api/core/workflow/nodes/http_request/executor.py @@ -188,7 +188,13 @@ class Executor: if file_variable is None: raise FileFetchError(f"cannot fetch file with selector {file_selector}") file = file_variable.value - self.content = file_manager.download(file) + try: + self.content = file_manager.download(file) + except Exception as e: + # 确保在发生异常时也能清理资源 + if hasattr(self.content, 'close'): + self.content.close() + raise e case "x-www-form-urlencoded": form_data = { self.variable_pool.convert_template(item.key).text: self.variable_pool.convert_template( @@ -266,41 +272,34 @@ class Executor: return headers def _validate_and_parse_response(self, response: httpx.Response) -> Response: - executor_response = Response(response) + try: + executor_response = Response(response) - threshold_size = ( - dify_config.HTTP_REQUEST_NODE_MAX_BINARY_SIZE - if executor_response.is_file - else dify_config.HTTP_REQUEST_NODE_MAX_TEXT_SIZE - ) - if executor_response.size > threshold_size: - raise ResponseSizeError( - f"{'File' if executor_response.is_file else 'Text'} size is too large," - f" max size is {threshold_size / 1024 / 1024:.2f} MB," - f" but current size is {executor_response.readable_size}." + threshold_size = ( + dify_config.HTTP_REQUEST_NODE_MAX_BINARY_SIZE + if executor_response.is_file + else dify_config.HTTP_REQUEST_NODE_MAX_TEXT_SIZE ) + if executor_response.size > threshold_size: + raise ResponseSizeError( + f"{'File' if executor_response.is_file else 'Text'} size is too large," + f" max size is {threshold_size / 1024 / 1024:.2f} MB," + f" but current size is {executor_response.readable_size}." + ) - return executor_response + return executor_response + except Exception as e: + # 确保在发生异常时关闭响应 + response.close() + raise e def _do_http_request(self, headers: dict[str, Any]) -> httpx.Response: """ do http request depending on api bundle """ if self.method not in { - "get", - "head", - "post", - "put", - "delete", - "patch", - "options", - "GET", - "POST", - "PUT", - "PATCH", - "DELETE", - "HEAD", - "OPTIONS", + "get", "head", "post", "put", "delete", "patch", "options", + "GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS", }: raise InvalidHttpMethodError(f"Invalid http method {self.method}") @@ -316,21 +315,29 @@ class Executor: "follow_redirects": True, "max_retries": self.max_retries, } - # request_args = {k: v for k, v in request_args.items() if v is not None} - try: - response = getattr(ssrf_proxy, self.method.lower())(**request_args) - except (ssrf_proxy.MaxRetriesExceededError, httpx.RequestError) as e: - raise HttpRequestNodeError(str(e)) - # FIXME: fix type ignore, this maybe httpx type issue - return response # type: ignore + + # 使用 with 语句来确保资源正确释放 + with httpx.Client() as client: + try: + response = getattr(client, self.method.lower())(**request_args) + # 创建一个新的 Response 对象并复制需要的数据 + # 这样可以安全地关闭原始响应 + copied_response = response.copy() + response.close() + return copied_response + except (ssrf_proxy.MaxRetriesExceededError, httpx.RequestError) as e: + raise HttpRequestNodeError(str(e)) def invoke(self) -> Response: - # assemble headers - headers = self._assembling_headers() - # do http request - response = self._do_http_request(headers) - # validate response - return self._validate_and_parse_response(response) + response = None + try: + headers = self._assembling_headers() + response = self._do_http_request(headers) + return self._validate_and_parse_response(response) + except Exception as e: + if response is not None: + response.close() + raise e def to_log(self): url_parts = urlparse(self.url) From f2a08302ae6691f30abd57368e526489f4d753de Mon Sep 17 00:00:00 2001 From: crazywoola <427733928@qq.com> Date: Tue, 18 Mar 2025 15:18:52 +0800 Subject: [PATCH 5/7] fix: lint --- .../workflow/nodes/http_request/executor.py | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/api/core/workflow/nodes/http_request/executor.py b/api/core/workflow/nodes/http_request/executor.py index 577f111ec6..6419432936 100644 --- a/api/core/workflow/nodes/http_request/executor.py +++ b/api/core/workflow/nodes/http_request/executor.py @@ -192,7 +192,7 @@ class Executor: self.content = file_manager.download(file) except Exception as e: # 确保在发生异常时也能清理资源 - if hasattr(self.content, 'close'): + if hasattr(self.content, "close"): self.content.close() raise e case "x-www-form-urlencoded": @@ -298,8 +298,20 @@ class Executor: do http request depending on api bundle """ if self.method not in { - "get", "head", "post", "put", "delete", "patch", "options", - "GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS", + "get", + "head", + "post", + "put", + "delete", + "patch", + "options", + "GET", + "POST", + "PUT", + "PATCH", + "DELETE", + "HEAD", + "OPTIONS", }: raise InvalidHttpMethodError(f"Invalid http method {self.method}") @@ -315,7 +327,7 @@ class Executor: "follow_redirects": True, "max_retries": self.max_retries, } - + # 使用 with 语句来确保资源正确释放 with httpx.Client() as client: try: From 27d5535b540ba117a46bc97ca6a02f74aeba0abc Mon Sep 17 00:00:00 2001 From: zhouyy Date: Tue, 18 Mar 2025 17:59:52 +0800 Subject: [PATCH 6/7] fix: improve exception handling and resource cleanup in HTTP request executor --- api/core/workflow/nodes/http_request/executor.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/api/core/workflow/nodes/http_request/executor.py b/api/core/workflow/nodes/http_request/executor.py index 18d08bb163..c65e9614a6 100644 --- a/api/core/workflow/nodes/http_request/executor.py +++ b/api/core/workflow/nodes/http_request/executor.py @@ -283,7 +283,7 @@ class Executor: return executor_response except Exception as e: - # 确保在发生异常时关闭响应 + # Ensure response is closed when an exception occurs response.close() raise e @@ -310,12 +310,12 @@ class Executor: "max_retries": self.max_retries, } - # 使用 with 语句来确保资源正确释放 + # Use with statement to ensure proper resource cleanup with httpx.Client() as client: try: response = getattr(client, self.method.lower())(**request_args) - # 创建一个新的 Response 对象并复制需要的数据 - # 这样可以安全地关闭原始响应 + # Create a new Response object and copy required data + # This allows safe closure of the original response copied_response = response.copy() response.close() return copied_response From 97f7a29fec48c8a4c7b9df765938b1a4ff7195c7 Mon Sep 17 00:00:00 2001 From: crazywoola <427733928@qq.com> Date: Wed, 19 Mar 2025 19:06:46 +0800 Subject: [PATCH 7/7] fix: lint --- .../workflow/nodes/http_request/executor.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/api/core/workflow/nodes/http_request/executor.py b/api/core/workflow/nodes/http_request/executor.py index c65e9614a6..a37aa0fcc4 100644 --- a/api/core/workflow/nodes/http_request/executor.py +++ b/api/core/workflow/nodes/http_request/executor.py @@ -292,8 +292,20 @@ class Executor: do http request depending on api bundle """ if self.method not in { - "get", "head", "post", "put", "delete", "patch", "options", - "GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS", + "get", + "head", + "post", + "put", + "delete", + "patch", + "options", + "GET", + "POST", + "PUT", + "PATCH", + "DELETE", + "HEAD", + "OPTIONS", }: raise InvalidHttpMethodError(f"Invalid http method {self.method}") @@ -309,7 +321,7 @@ class Executor: "follow_redirects": True, "max_retries": self.max_retries, } - + # Use with statement to ensure proper resource cleanup with httpx.Client() as client: try: