external knowledge

This commit is contained in:
jyong 2024-08-20 16:18:35 +08:00
parent f6c8390b0b
commit e7762b731c
3 changed files with 90 additions and 14 deletions

View File

@ -0,0 +1,29 @@
from typing import Literal, Union, Optional
from pydantic import BaseModel
class AuthorizationConfig(BaseModel):
type: Literal[None, 'basic', 'bearer', 'custom']
api_key: Union[None, str] = None
header: Union[None, str] = None
class Authorization(BaseModel):
type: Literal['no-auth', 'api-key']
config: Optional[AuthorizationConfig] = None
class ProcessStatusSetting(BaseModel):
request_method: str
url: str
class ApiTemplateSetting(BaseModel):
method: str
url: str
request_method: str
authorization: Authorization
headers: Optional[dict] = None
params: Optional[dict] = None
callback_setting: Optional[ProcessStatusSetting] = None

View File

@ -1,5 +1,9 @@
import json
from copy import deepcopy
from datetime import datetime, timezone
from typing import Any, Union, Optional
import httpx
from core.helper import ssrf_proxy
from extensions.ext_database import db
@ -16,7 +20,7 @@ from models.dataset import (
Document,
DocumentSegment, ExternalApiTemplates, ExternalKnowledgeBindings,
)
from services.entities.external_knowledge_entities.external_knowledge_entities import Authorization, ApiTemplateSetting
class ExternalDatasetService:
@ -171,20 +175,52 @@ class ExternalDatasetService:
return dataset
@staticmethod
def process_external_api(self, headers: dict[str, Any]) -> httpx.Response:
def process_external_api(settings: ApiTemplateSetting,
headers: Union[None, dict[str, Any]],
parameter: Union[None, dict[str, Any]],
files: Union[None, dict[str, Any]]) -> httpx.Response:
"""
do http request depending on api bundle
"""
kwargs = {
'url': self.server_url,
'url': settings.url,
'headers': headers,
'params': self.params,
'timeout': (self.timeout.connect, self.timeout.read, self.timeout.write),
'follow_redirects': True,
}
if self.method in ('get', 'head', 'post', 'put', 'delete', 'patch'):
response = getattr(ssrf_proxy, self.method)(data=self.body, files=self.files, **kwargs)
if settings.request_method in ('get', 'head', 'post', 'put', 'delete', 'patch'):
response = getattr(ssrf_proxy, settings.request_method)(data=parameter, files=files, **kwargs)
else:
raise ValueError(f'Invalid http method {self.method}')
return response
raise ValueError(f'Invalid http method {settings.request_method}')
return response
@staticmethod
def assembling_headers(authorization: Authorization, headers: Optional[dict] = None) -> dict[str, Any]:
authorization = deepcopy(authorization)
if headers:
headers = deepcopy(headers)
else:
headers= {}
if authorization.type == 'api-key':
if authorization.config is None:
raise ValueError('authorization config is required')
if authorization.config.api_key is None:
raise ValueError('api_key is required')
if not authorization.config.header:
authorization.config.header = 'Authorization'
if authorization.config.type == 'bearer':
headers[authorization.config.header] = f'Bearer {authorization.config.api_key}'
elif authorization.config.type == 'basic':
headers[authorization.config.header] = f'Basic {authorization.config.api_key}'
elif authorization.config.type == 'custom':
headers[authorization.config.header] = authorization.config.api_key
return headers
@staticmethod
def get_api_template_settings(settings: dict) -> ApiTemplateSetting:
return ApiTemplateSetting.parse_obj(settings)

View File

@ -1,4 +1,5 @@
import datetime
import json
import logging
import time
@ -8,8 +9,10 @@ from celery import shared_task
from configs import dify_config
from core.indexing_runner import DocumentIsPausedException, IndexingRunner
from extensions.ext_database import db
from extensions.ext_storage import storage
from models.dataset import Dataset, Document, ExternalApiTemplates
from models.model import UploadFile
from services.external_knowledge_service import ExternalDatasetService
from services.feature_service import FeatureService
@ -23,7 +26,6 @@ def external_document_indexing_task(dataset_id: str, api_template_id: str, data_
:param process_parameter:
Usage: external_document_indexing_task.delay(dataset_id, document_id)
"""
documents = []
start_at = time.perf_counter()
dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first()
@ -40,7 +42,7 @@ def external_document_indexing_task(dataset_id: str, api_template_id: str, data_
if not api_template:
logging.info(click.style('Processed external dataset: {} failed, api template: {} not exit.'.format(dataset_id, api_template_id), fg='red'))
return
file_resource = []
files = {}
if data_source["type"] == "upload_file":
upload_file_list = data_source["info_list"]['file_info_list']['file_ids']
for file_id in upload_file_list:
@ -49,14 +51,23 @@ def external_document_indexing_task(dataset_id: str, api_template_id: str, data_
UploadFile.id == file_id
).first()
if file:
file_resource.append(file)
files[file.id] = (file.name, storage.load_once(file.key), file.mime_type)
try:
settings = ExternalDatasetService.get_api_template_settings(json.loads(api_template.settings))
# assemble headers
headers = self._assembling_headers()
headers = ExternalDatasetService.assembling_headers(settings.authorization, settings.headers)
# do http request
response = self._do_http_request(headers)
response = ExternalDatasetService.process_external_api(settings, headers, process_parameter, files)
if response.status_code != 200:
logging.info(click.style('Processed external dataset: {} failed, status code: {}'.format(dataset.id, response.status_code), fg='red'))
return
end_at = time.perf_counter()
logging.info(
click.style('Processed external dataset: {} successful, latency: {}'.format(dataset.id, end_at - start_at), fg='green'))
except DocumentIsPausedException as ex:
logging.info(click.style(str(ex), fg='yellow'))
except Exception:
pass