add external knowledge

This commit is contained in:
jyong 2024-08-20 11:13:29 +08:00
parent bbb6fcc4f0
commit 517cdb2ca4
5 changed files with 524 additions and 0 deletions

View File

@ -0,0 +1,213 @@
import flask_restful
from flask import request
from flask_login import current_user
from flask_restful import Resource, marshal, marshal_with, reqparse
from werkzeug.exceptions import Forbidden, NotFound
import services
from configs import dify_config
from controllers.console import api
from controllers.console.apikey import api_key_fields, api_key_list
from controllers.console.app.error import ProviderNotInitializeError
from controllers.console.datasets.error import DatasetInUseError, DatasetNameDuplicateError, IndexingEstimateError
from controllers.console.setup import setup_required
from controllers.console.wraps import account_initialization_required
from core.errors.error import LLMBadRequestError, ProviderTokenNotInitError
from core.indexing_runner import IndexingRunner
from core.model_runtime.entities.model_entities import ModelType
from core.provider_manager import ProviderManager
from core.rag.datasource.vdb.vector_type import VectorType
from core.rag.extractor.entity.extract_setting import ExtractSetting
from core.rag.retrieval.retrival_methods import RetrievalMethod
from extensions.ext_database import db
from fields.app_fields import related_app_list
from fields.dataset_fields import dataset_detail_fields, dataset_query_detail_fields
from fields.document_fields import document_status_fields
from libs.login import login_required
from models.dataset import Dataset, Document, DocumentSegment
from models.model import ApiToken, UploadFile
from services.dataset_service import DatasetPermissionService, DatasetService, DocumentService
from services.external_knowledge_service import ExternalDatasetService
def _validate_name(name):
if not name or len(name) < 1 or len(name) > 100:
raise ValueError('Name must be between 1 to 100 characters.')
return name
def _validate_description_length(description):
if len(description) > 400:
raise ValueError('Description cannot exceed 400 characters.')
return description
class ExternalApiTemplateListApi(Resource):
@setup_required
@login_required
@account_initialization_required
def get(self):
page = request.args.get('page', default=1, type=int)
limit = request.args.get('limit', default=20, type=int)
search = request.args.get('keyword', default=None, type=str)
api_templates, total = ExternalDatasetService.get_external_api_templates(
page,
limit,
current_user.current_tenant_id,
search
)
response = {
'data': [item.to_dict() for item in api_templates],
'has_more': len(api_templates) == limit,
'limit': limit,
'total': total,
'page': page
}
return response, 200
@setup_required
@login_required
@account_initialization_required
def post(self):
parser = reqparse.RequestParser()
parser.add_argument('name', nullable=False, required=True,
help='type is required. Name must be between 1 to 100 characters.',
type=_validate_name)
parser.add_argument('settings', type=list, location='json',
nullable=False,
required=True, )
args = parser.parse_args()
ExternalDatasetService.validate_api_list(args['settings'])
# The role of the current user in the ta table must be admin, owner, or editor, or dataset_operator
if not current_user.is_dataset_editor:
raise Forbidden()
try:
api_template = ExternalDatasetService.create_api_template(
tenant_id=current_user.current_tenant_id,
user_id=current_user.id,
args=args
)
except services.errors.dataset.DatasetNameDuplicateError:
raise DatasetNameDuplicateError()
return api_template.to_dict(), 201
class ExternalApiTemplateApi(Resource):
@setup_required
@login_required
@account_initialization_required
def get(self, api_template_id):
api_template_id = str(api_template_id)
api_template = ExternalDatasetService.get_api_template(api_template_id)
if api_template is None:
raise NotFound("API template not found.")
return api_template.to_dict(), 200
@setup_required
@login_required
@account_initialization_required
def patch(self, api_template_id):
api_template_id = str(api_template_id)
parser = reqparse.RequestParser()
parser.add_argument('name', nullable=False, required=True,
help='type is required. Name must be between 1 to 100 characters.',
type=_validate_name)
parser.add_argument('settings', type=list, location='json',
nullable=False,
required=True, )
args = parser.parse_args()
ExternalDatasetService.validate_api_list(args['settings'])
api_template = ExternalDatasetService.update_api_template(
tenant_id=current_user.current_tenant_id,
user_id=current_user.id,
api_template_id=api_template_id,
args=args
)
return api_template.to_dict(), 200
@setup_required
@login_required
@account_initialization_required
def delete(self, api_template_id):
api_template_id = str(api_template_id)
# The role of the current user in the ta table must be admin, owner, or editor
if not current_user.is_editor or current_user.is_dataset_operator:
raise Forbidden()
ExternalDatasetService.delete_api_template(current_user.current_tenant_id, api_template_id)
return {'result': 'success'}, 204
class ExternalApiUseCheckApi(Resource):
@setup_required
@login_required
@account_initialization_required
def get(self, api_template_id):
api_template_id = str(api_template_id)
external_api_template_is_using = ExternalDatasetService.external_api_template_use_check(api_template_id)
return {'is_using': external_api_template_is_using}, 200
class ExternalDatasetInitApi(Resource):
@setup_required
@login_required
@account_initialization_required
def post(self):
# The role of the current user in the ta table must be admin, owner, or editor
if not current_user.is_editor:
raise Forbidden()
parser = reqparse.RequestParser()
parser.add_argument('api_template_id', type=str, required=True, nullable=True, location='json')
parser.add_argument('name', nullable=False, required=True,
help='name is required. Name must be between 1 to 100 characters.',
type=_validate_name)
parser.add_argument('description', type=str, required=True, nullable=True, location='json')
parser.add_argument('data_source', type=dict, required=True, nullable=True, location='json')
parser.add_argument('process_parameter', type=dict, required=True, nullable=True, location='json')
args = parser.parse_args()
# The role of the current user in the ta table must be admin, owner, or editor, or dataset_operator
if not current_user.is_dataset_editor:
raise Forbidden()
# validate args
ExternalDatasetService.document_create_args_validate(
current_user.current_tenant_id,
args['api_template_id'],
args['process_parameter']
)
try:
dataset, documents, batch = ExternalDatasetService.init_external_dataset(
tenant_id=current_user.current_tenant_id,
user_id=current_user.id,
args=args,
)
except Exception as ex:
raise ProviderNotInitializeError(ex.description)
response = {
'dataset': dataset,
'documents': documents,
'batch': batch
}
return response
api.add_resource(ExternalApiTemplateListApi, '/datasets/external-api-template')
api.add_resource(ExternalApiTemplateApi, '/datasets/external-api-template/<uuid:api_template_id>')
api.add_resource(ExternalApiUseCheckApi, '/datasets/external-api-template/<uuid:api_template_id>/use-check')

View File

@ -0,0 +1,11 @@
from flask_restful import fields
from libs.helper import TimestampField
api_template_query_detail_fields = {
"id": fields.String,
"name": fields.String,
"setting": fields.String,
"created_by": fields.String,
"created_at": TimestampField,
}

View File

@ -682,3 +682,65 @@ class DatasetPermission(db.Model):
tenant_id = db.Column(StringUUID, nullable=False)
has_permission = db.Column(db.Boolean, nullable=False, server_default=db.text('true'))
created_at = db.Column(db.DateTime, nullable=False, server_default=db.text('CURRENT_TIMESTAMP(0)'))
class ExternalApiTemplates(db.Model):
__tablename__ = 'external_api_templates'
__table_args__ = (
db.PrimaryKeyConstraint('id', name='external_api_template_pkey'),
db.Index('external_api_templates_tenant_idx', 'tenant_id'),
db.Index('external_api_templates_name_idx', 'name'),
)
id = db.Column(StringUUID, nullable=False,
server_default=db.text('uuid_generate_v4()'))
name = db.Column(db.String(255), nullable=False)
tenant_id = db.Column(StringUUID, nullable=False)
settings = db.Column(db.Text, nullable=True)
created_by = db.Column(StringUUID, nullable=False)
created_at = db.Column(db.DateTime, nullable=False,
server_default=db.text('CURRENT_TIMESTAMP(0)'))
updated_by = db.Column(StringUUID, nullable=True)
updated_at = db.Column(db.DateTime, nullable=False,
server_default=db.text('CURRENT_TIMESTAMP(0)'))
def to_dict(self):
return {
'id': self.id,
'tenant_id': self.tenant_id,
'name': self.name,
'settings': self.settings_dict,
'created_by': self.created_by,
'created_at': self.created_at,
}
@property
def settings_dict(self):
try:
return json.loads(self.settings) if self.settings else None
except JSONDecodeError:
return None
class ExternalKnowledgeBindings(db.Model):
__tablename__ = 'external_knowledge_bindings'
__table_args__ = (
db.PrimaryKeyConstraint('id', name='external_knowledge_bindings_pkey'),
db.Index('external_knowledge_bindings_tenant_idx', 'tenant_id'),
db.Index('external_knowledge_bindings_dataset_idx', 'dataset_id'),
db.Index('external_knowledge_bindings_external_knowledge_idx', 'external_knowledge_id'),
db.Index('external_knowledge_bindings_external_api_template_idx', 'external_api_template_id'),
)
id = db.Column(StringUUID, nullable=False,
server_default=db.text('uuid_generate_v4()'))
tenant_id = db.Column(StringUUID, nullable=False)
external_api_template_id = db.Column(StringUUID, nullable=False)
dataset_id = db.Column(StringUUID, nullable=False)
external_knowledge_id = db.Column(db.Text, nullable=False)
created_by = db.Column(StringUUID, nullable=False)
created_at = db.Column(db.DateTime, nullable=False,
server_default=db.text('CURRENT_TIMESTAMP(0)'))
updated_by = db.Column(StringUUID, nullable=True)
updated_at = db.Column(db.DateTime, nullable=False,
server_default=db.text('CURRENT_TIMESTAMP(0)'))

View File

@ -0,0 +1,176 @@
import json
from datetime import datetime, timezone
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from libs import helper
from models.account import Account, TenantAccountRole
from models.dataset import (
AppDatasetJoin,
Dataset,
DatasetCollectionBinding,
DatasetPermission,
DatasetProcessRule,
DatasetQuery,
Document,
DocumentSegment, ExternalApiTemplates, ExternalKnowledgeBindings,
)
class ExternalDatasetService:
@staticmethod
def get_external_api_templates(page, per_page, tenant_id, search=None) -> tuple[list[ExternalApiTemplates], int]:
query = ExternalApiTemplates.query.filter(ExternalApiTemplates.tenant_id == tenant_id).order_by(
ExternalApiTemplates.created_at.desc()
)
if search:
query = query.filter(ExternalApiTemplates.name.ilike(f'%{search}%'))
api_templates = query.paginate(
page=page,
per_page=per_page,
max_per_page=100,
error_out=False
)
return api_templates.items, api_templates.total
@classmethod
def validate_api_list(cls, api_settings: list[dict]):
if not api_settings:
raise ValueError('api list is empty')
for api_settings_dict in api_settings:
if not api_settings_dict.get('method'):
raise ValueError('api name is required')
if not api_settings_dict.get('url'):
raise ValueError('api url is required')
if api_settings_dict.get('authorization'):
if not api_settings_dict.get('authorization').get('type'):
raise ValueError('authorization type is required')
if api_settings_dict.get('authorization').get('type') == 'bearer':
if not api_settings_dict.get('authorization').get('api_key'):
raise ValueError('authorization token is required')
if api_settings_dict.get('authorization').get('type') == 'custom':
if not api_settings_dict.get('authorization').get('header'):
raise ValueError('authorization header is required')
if api_settings_dict.get('method') in ['create', 'update']:
if not api_settings_dict.get('callback_setting'):
raise ValueError('callback_setting is required for create and update method')
@staticmethod
def create_api_template(tenant_id: str, user_id: str, args: dict) -> ExternalApiTemplates:
api_template = ExternalApiTemplates(
tenant_id=tenant_id,
created_by=user_id,
updated_by=user_id,
name=args.get('name'),
settings=json.dumps(args.get('settings'), ensure_ascii=False),
)
db.session.add(api_template)
db.session.commit()
return api_template
@staticmethod
def get_api_template(api_template_id: str) -> ExternalApiTemplates:
return ExternalApiTemplates.query.filter_by(
id=api_template_id
).first()
@staticmethod
def update_api_template(tenant_id, user_id, api_template_id, args) -> ExternalApiTemplates:
api_template = ExternalApiTemplates.query.filter_by(
id=api_template_id,
tenant_id=tenant_id
).first()
if api_template is None:
raise ValueError('api template not found')
api_template.name = args.get('name')
api_template.settings = json.dumps(args.get('settings'), ensure_ascii=False)
api_template.updated_by = user_id
api_template.updated_at = datetime.now(timezone.utc).replace(tzinfo=None)
db.session.commit()
return api_template
@staticmethod
def delete_api_template(tenant_id: str, api_template_id: str):
api_template = ExternalApiTemplates.query.filter_by(
id=api_template_id,
tenant_id=tenant_id
).first()
if api_template is None:
raise ValueError('api template not found')
db.session.delete(api_template)
db.session.commit()
@staticmethod
def external_api_template_use_check(api_template_id: str) -> bool:
count = ExternalKnowledgeBindings.query.filter_by(
external_api_template_id=api_template_id
).count()
if count > 0:
return True
return False
@staticmethod
def document_create_args_validate(tenant_id: str, api_template_id: str, process_parameter: dict):
api_template = ExternalApiTemplates.query.filter_by(
id=api_template_id,
tenant_id=tenant_id
).first()
if api_template is None:
raise ValueError('api template not found')
settings = json.loads(api_template.settings)
for settings in settings:
if settings.get('method') == 'create':
parameters = settings.get('parameters')
for parameter in parameters:
if parameter.get('required') and not process_parameter.get(parameter.get('name')):
raise ValueError(f'{parameter.get("name")} is required')
@staticmethod
def init_external_dataset(tenant_id: str, user_id: str, args: dict):
api_template_id = args.get('api_template_id')
data_source = args.get('data_source')
process_parameter = args.get('process_parameter')
api_template = ExternalApiTemplates.query.filter_by(
id=api_template_id,
tenant_id=tenant_id
).first()
if api_template is None:
raise ValueError('api template not found')
settings = json.loads(api_template.settings)
for settings in settings:
if settings.get('method') == 'create':
ExternalDatasetService.process_external_api(api_template_id, data_source, process_parameter)
break
# save dataset
dataset = Dataset(
tenant_id=tenant_id,
name=args.get('name'),
description=args.get('description', ''),
provider='external',
created_by=user_id,
)
db.session.add(dataset)
db.session.commit()
external_document_indexing_task.delay(dataset.id, api_template_id, data_source, process_parameter)
return dataset
@staticmethod
def process_external_api(api_template_id: str, data_source: dict, process_parameter: dict):
pass

View File

@ -0,0 +1,62 @@
import datetime
import logging
import time
import click
from celery import shared_task
from configs import dify_config
from core.indexing_runner import DocumentIsPausedException, IndexingRunner
from extensions.ext_database import db
from models.dataset import Dataset, Document, ExternalApiTemplates
from models.model import UploadFile
from services.feature_service import FeatureService
@shared_task(queue='dataset')
def external_document_indexing_task(dataset_id: str, api_template_id: str, data_source: dict, process_parameter: dict):
"""
Async process document
:param dataset_id:
:param api_template_id:
:param data_source:
:param process_parameter:
Usage: external_document_indexing_task.delay(dataset_id, document_id)
"""
documents = []
start_at = time.perf_counter()
dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first()
if not dataset:
logging.info(click.style('Processed external dataset: {} failed, dataset not exit.'.format(dataset_id), fg='red'))
return
# get external api template
api_template = db.session.query(ExternalApiTemplates).filter(
ExternalApiTemplates.id == api_template_id,
ExternalApiTemplates.tenant_id == dataset.tenant_id
).first()
if not api_template:
logging.info(click.style('Processed external dataset: {} failed, api template: {} not exit.'.format(dataset_id, api_template_id), fg='red'))
return
file_resource = []
if data_source["type"] == "upload_file":
upload_file_list = data_source["info_list"]['file_info_list']['file_ids']
for file_id in upload_file_list:
file = db.session.query(UploadFile).filter(
UploadFile.tenant_id == dataset.tenant_id,
UploadFile.id == file_id
).first()
if file:
file_resource.append(file)
try:
# assemble headers
headers = self._assembling_headers()
# do http request
response = self._do_http_request(headers)
except DocumentIsPausedException as ex:
logging.info(click.style(str(ex), fg='yellow'))
except Exception:
pass