merge migration

This commit is contained in:
jyong 2024-08-21 16:25:18 +08:00
parent e7762b731c
commit 067b956b2c
2 changed files with 46 additions and 11 deletions

View File

@ -1,4 +1,6 @@
import json import json
import random
import time
from copy import deepcopy from copy import deepcopy
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Any, Union, Optional from typing import Any, Union, Optional
@ -20,7 +22,10 @@ from models.dataset import (
Document, Document,
DocumentSegment, ExternalApiTemplates, ExternalKnowledgeBindings, DocumentSegment, ExternalApiTemplates, ExternalKnowledgeBindings,
) )
from models.model import UploadFile
from services.dataset_service import DocumentService
from services.entities.external_knowledge_entities.external_knowledge_entities import Authorization, ApiTemplateSetting from services.entities.external_knowledge_entities.external_knowledge_entities import Authorization, ApiTemplateSetting
from tasks.external_document_indexing_task import external_document_indexing_task
class ExternalDatasetService: class ExternalDatasetService:
@ -142,9 +147,13 @@ class ExternalDatasetService:
raise ValueError(f'{parameter.get("name")} is required') raise ValueError(f'{parameter.get("name")} is required')
@staticmethod @staticmethod
def init_external_dataset(tenant_id: str, user_id: str, args: dict): def init_external_dataset(tenant_id: str, user_id: str, args: dict, created_from: str = 'web'):
api_template_id = args.get('api_template_id') api_template_id = args.get('api_template_id')
data_source = args.get('data_source') data_source = args.get('data_source')
if data_source is None:
raise ValueError('data source is required')
process_parameter = args.get('process_parameter') process_parameter = args.get('process_parameter')
api_template = ExternalApiTemplates.query.filter_by( api_template = ExternalApiTemplates.query.filter_by(
id=api_template_id, id=api_template_id,
@ -152,13 +161,7 @@ class ExternalDatasetService:
).first() ).first()
if api_template is None: if api_template is None:
raise ValueError('api template not found') raise ValueError('api template not found')
settings = json.loads(api_template.settings)
for settings in settings:
if settings.get('method') == 'create':
ExternalDatasetService.process_external_api(api_template_id, data_source, process_parameter)
break
# save dataset
dataset = Dataset( dataset = Dataset(
tenant_id=tenant_id, tenant_id=tenant_id,
name=args.get('name'), name=args.get('name'),
@ -168,8 +171,38 @@ class ExternalDatasetService:
) )
db.session.add(dataset) db.session.add(dataset)
db.session.commit() db.session.flush()
position = DocumentService.get_documents_position(dataset.id)
batch = time.strftime('%Y%m%d%H%M%S') + str(random.randint(100000, 999999))
document_ids = []
if data_source["type"] == "upload_file":
upload_file_list = data_source["info_list"]['file_info_list']['file_ids']
for file_id in upload_file_list:
file = db.session.query(UploadFile).filter(
UploadFile.tenant_id == dataset.tenant_id,
UploadFile.id == file_id
).first()
if file:
data_source_info = {
"upload_file_id": file_id,
}
document = Document(
tenant_id=dataset.tenant_id,
dataset_id=dataset.id,
position=position,
data_source_type=data_source["type"],
data_source_info=json.dumps(data_source_info),
batch=batch,
name=file.name,
created_from=created_from,
created_by=user_id,
)
position += 1
db.session.add(document)
db.session.flush()
document_ids.append(document.id)
db.session.commit()
external_document_indexing_task.delay(dataset.id, api_template_id, data_source, process_parameter) external_document_indexing_task.delay(dataset.id, api_template_id, data_source, process_parameter)
return dataset return dataset

View File

@ -59,9 +59,11 @@ def external_document_indexing_task(dataset_id: str, api_template_id: str, data_
# do http request # do http request
response = ExternalDatasetService.process_external_api(settings, headers, process_parameter, files) response = ExternalDatasetService.process_external_api(settings, headers, process_parameter, files)
if response.status_code != 200: job_id = response.json().get('job_id')
logging.info(click.style('Processed external dataset: {} failed, status code: {}'.format(dataset.id, response.status_code), fg='red')) if job_id:
return # save job_id to dataset
dataset.job_id = job_id
db.session.commit()
end_at = time.perf_counter() end_at = time.perf_counter()
logging.info( logging.info(