diff --git a/api/services/external_knowledge_service.py b/api/services/external_knowledge_service.py index 303fbf2013..b4d23bce14 100644 --- a/api/services/external_knowledge_service.py +++ b/api/services/external_knowledge_service.py @@ -1,4 +1,6 @@ import json +import random +import time from copy import deepcopy from datetime import datetime, timezone from typing import Any, Union, Optional @@ -20,7 +22,10 @@ from models.dataset import ( Document, DocumentSegment, ExternalApiTemplates, ExternalKnowledgeBindings, ) +from models.model import UploadFile +from services.dataset_service import DocumentService from services.entities.external_knowledge_entities.external_knowledge_entities import Authorization, ApiTemplateSetting +from tasks.external_document_indexing_task import external_document_indexing_task class ExternalDatasetService: @@ -142,9 +147,13 @@ class ExternalDatasetService: raise ValueError(f'{parameter.get("name")} is required') @staticmethod - def init_external_dataset(tenant_id: str, user_id: str, args: dict): + def init_external_dataset(tenant_id: str, user_id: str, args: dict, created_from: str = 'web'): api_template_id = args.get('api_template_id') + data_source = args.get('data_source') + if data_source is None: + raise ValueError('data source is required') + process_parameter = args.get('process_parameter') api_template = ExternalApiTemplates.query.filter_by( id=api_template_id, @@ -152,13 +161,7 @@ class ExternalDatasetService: ).first() if api_template is None: raise ValueError('api template not found') - settings = json.loads(api_template.settings) - for settings in settings: - if settings.get('method') == 'create': - ExternalDatasetService.process_external_api(api_template_id, data_source, process_parameter) - break - # save dataset dataset = Dataset( tenant_id=tenant_id, name=args.get('name'), @@ -168,8 +171,38 @@ class ExternalDatasetService: ) db.session.add(dataset) - db.session.commit() + db.session.flush() + position = DocumentService.get_documents_position(dataset.id) + batch = time.strftime('%Y%m%d%H%M%S') + str(random.randint(100000, 999999)) + document_ids = [] + if data_source["type"] == "upload_file": + upload_file_list = data_source["info_list"]['file_info_list']['file_ids'] + for file_id in upload_file_list: + file = db.session.query(UploadFile).filter( + UploadFile.tenant_id == dataset.tenant_id, + UploadFile.id == file_id + ).first() + if file: + data_source_info = { + "upload_file_id": file_id, + } + document = Document( + tenant_id=dataset.tenant_id, + dataset_id=dataset.id, + position=position, + data_source_type=data_source["type"], + data_source_info=json.dumps(data_source_info), + batch=batch, + name=file.name, + created_from=created_from, + created_by=user_id, + ) + position += 1 + db.session.add(document) + db.session.flush() + document_ids.append(document.id) + db.session.commit() external_document_indexing_task.delay(dataset.id, api_template_id, data_source, process_parameter) return dataset diff --git a/api/tasks/external_document_indexing_task.py b/api/tasks/external_document_indexing_task.py index 31a16fbaf2..938cf6962d 100644 --- a/api/tasks/external_document_indexing_task.py +++ b/api/tasks/external_document_indexing_task.py @@ -59,9 +59,11 @@ def external_document_indexing_task(dataset_id: str, api_template_id: str, data_ # do http request response = ExternalDatasetService.process_external_api(settings, headers, process_parameter, files) - if response.status_code != 200: - logging.info(click.style('Processed external dataset: {} failed, status code: {}'.format(dataset.id, response.status_code), fg='red')) - return + job_id = response.json().get('job_id') + if job_id: + # save job_id to dataset + dataset.job_id = job_id + db.session.commit() end_at = time.perf_counter() logging.info(