diff --git a/api/services/entities/external_knowledge_entities/external_knowledge_entities.py b/api/services/entities/external_knowledge_entities/external_knowledge_entities.py new file mode 100644 index 0000000000..068dbd96ad --- /dev/null +++ b/api/services/entities/external_knowledge_entities/external_knowledge_entities.py @@ -0,0 +1,29 @@ +from typing import Literal, Union, Optional + +from pydantic import BaseModel + + +class AuthorizationConfig(BaseModel): + type: Literal[None, 'basic', 'bearer', 'custom'] + api_key: Union[None, str] = None + header: Union[None, str] = None + + +class Authorization(BaseModel): + type: Literal['no-auth', 'api-key'] + config: Optional[AuthorizationConfig] = None + + +class ProcessStatusSetting(BaseModel): + request_method: str + url: str + + +class ApiTemplateSetting(BaseModel): + method: str + url: str + request_method: str + authorization: Authorization + headers: Optional[dict] = None + params: Optional[dict] = None + callback_setting: Optional[ProcessStatusSetting] = None diff --git a/api/services/external_knowledge_service.py b/api/services/external_knowledge_service.py index 181af66a20..303fbf2013 100644 --- a/api/services/external_knowledge_service.py +++ b/api/services/external_knowledge_service.py @@ -1,5 +1,9 @@ import json +from copy import deepcopy from datetime import datetime, timezone +from typing import Any, Union, Optional + +import httpx from core.helper import ssrf_proxy from extensions.ext_database import db @@ -16,7 +20,7 @@ from models.dataset import ( Document, DocumentSegment, ExternalApiTemplates, ExternalKnowledgeBindings, ) - +from services.entities.external_knowledge_entities.external_knowledge_entities import Authorization, ApiTemplateSetting class ExternalDatasetService: @@ -171,20 +175,52 @@ class ExternalDatasetService: return dataset @staticmethod - def process_external_api(self, headers: dict[str, Any]) -> httpx.Response: + def process_external_api(settings: ApiTemplateSetting, + headers: Union[None, dict[str, Any]], + parameter: Union[None, dict[str, Any]], + files: Union[None, dict[str, Any]]) -> httpx.Response: """ do http request depending on api bundle """ + kwargs = { - 'url': self.server_url, + 'url': settings.url, 'headers': headers, - 'params': self.params, - 'timeout': (self.timeout.connect, self.timeout.read, self.timeout.write), 'follow_redirects': True, } - if self.method in ('get', 'head', 'post', 'put', 'delete', 'patch'): - response = getattr(ssrf_proxy, self.method)(data=self.body, files=self.files, **kwargs) + if settings.request_method in ('get', 'head', 'post', 'put', 'delete', 'patch'): + response = getattr(ssrf_proxy, settings.request_method)(data=parameter, files=files, **kwargs) else: - raise ValueError(f'Invalid http method {self.method}') - return response \ No newline at end of file + raise ValueError(f'Invalid http method {settings.request_method}') + return response + + @staticmethod + def assembling_headers(authorization: Authorization, headers: Optional[dict] = None) -> dict[str, Any]: + authorization = deepcopy(authorization) + if headers: + headers = deepcopy(headers) + else: + headers= {} + if authorization.type == 'api-key': + if authorization.config is None: + raise ValueError('authorization config is required') + + if authorization.config.api_key is None: + raise ValueError('api_key is required') + + if not authorization.config.header: + authorization.config.header = 'Authorization' + + if authorization.config.type == 'bearer': + headers[authorization.config.header] = f'Bearer {authorization.config.api_key}' + elif authorization.config.type == 'basic': + headers[authorization.config.header] = f'Basic {authorization.config.api_key}' + elif authorization.config.type == 'custom': + headers[authorization.config.header] = authorization.config.api_key + + return headers + + @staticmethod + def get_api_template_settings(settings: dict) -> ApiTemplateSetting: + return ApiTemplateSetting.parse_obj(settings) \ No newline at end of file diff --git a/api/tasks/external_document_indexing_task.py b/api/tasks/external_document_indexing_task.py index 14509e65c4..31a16fbaf2 100644 --- a/api/tasks/external_document_indexing_task.py +++ b/api/tasks/external_document_indexing_task.py @@ -1,4 +1,5 @@ import datetime +import json import logging import time @@ -8,8 +9,10 @@ from celery import shared_task from configs import dify_config from core.indexing_runner import DocumentIsPausedException, IndexingRunner from extensions.ext_database import db +from extensions.ext_storage import storage from models.dataset import Dataset, Document, ExternalApiTemplates from models.model import UploadFile +from services.external_knowledge_service import ExternalDatasetService from services.feature_service import FeatureService @@ -23,7 +26,6 @@ def external_document_indexing_task(dataset_id: str, api_template_id: str, data_ :param process_parameter: Usage: external_document_indexing_task.delay(dataset_id, document_id) """ - documents = [] start_at = time.perf_counter() dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() @@ -40,7 +42,7 @@ def external_document_indexing_task(dataset_id: str, api_template_id: str, data_ if not api_template: logging.info(click.style('Processed external dataset: {} failed, api template: {} not exit.'.format(dataset_id, api_template_id), fg='red')) return - file_resource = [] + files = {} if data_source["type"] == "upload_file": upload_file_list = data_source["info_list"]['file_info_list']['file_ids'] for file_id in upload_file_list: @@ -49,14 +51,23 @@ def external_document_indexing_task(dataset_id: str, api_template_id: str, data_ UploadFile.id == file_id ).first() if file: - file_resource.append(file) + files[file.id] = (file.name, storage.load_once(file.key), file.mime_type) try: + settings = ExternalDatasetService.get_api_template_settings(json.loads(api_template.settings)) # assemble headers - headers = self._assembling_headers() + headers = ExternalDatasetService.assembling_headers(settings.authorization, settings.headers) # do http request - response = self._do_http_request(headers) + response = ExternalDatasetService.process_external_api(settings, headers, process_parameter, files) + if response.status_code != 200: + logging.info(click.style('Processed external dataset: {} failed, status code: {}'.format(dataset.id, response.status_code), fg='red')) + return + + end_at = time.perf_counter() + logging.info( + click.style('Processed external dataset: {} successful, latency: {}'.format(dataset.id, end_at - start_at), fg='green')) except DocumentIsPausedException as ex: logging.info(click.style(str(ex), fg='yellow')) + except Exception: pass