dify/api/tasks/external_document_indexing_task.py

94 lines
3.1 KiB
Python
Raw Normal View History

2024-08-20 16:18:35 +08:00
import json
2024-08-20 11:13:29 +08:00
import logging
import time
import click
from celery import shared_task
2024-09-18 14:36:51 +08:00
from core.indexing_runner import DocumentIsPausedException
2024-08-20 11:13:29 +08:00
from extensions.ext_database import db
2024-08-20 16:18:35 +08:00
from extensions.ext_storage import storage
2024-09-25 12:37:23 +08:00
from models.dataset import Dataset, ExternalKnowledgeApis
2024-08-20 11:13:29 +08:00
from models.model import UploadFile
2024-08-20 16:18:35 +08:00
from services.external_knowledge_service import ExternalDatasetService
2024-08-20 11:13:29 +08:00
2024-09-18 14:36:51 +08:00
@shared_task(queue="dataset")
2024-09-27 16:17:45 +08:00
def external_document_indexing_task(
dataset_id: str, external_knowledge_api_id: str, data_source: dict, process_parameter: dict
):
2024-08-20 11:13:29 +08:00
"""
Async process document
:param dataset_id:
2024-09-25 12:37:23 +08:00
:param external_knowledge_api_id:
2024-08-20 11:13:29 +08:00
:param data_source:
:param process_parameter:
Usage: external_document_indexing_task.delay(dataset_id, document_id)
"""
start_at = time.perf_counter()
dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first()
if not dataset:
2024-09-18 14:36:51 +08:00
logging.info(
click.style("Processed external dataset: {} failed, dataset not exit.".format(dataset_id), fg="red")
)
2024-08-20 11:13:29 +08:00
return
# get external api template
2024-09-25 12:37:23 +08:00
external_knowledge_api = (
db.session.query(ExternalKnowledgeApis)
2024-09-27 16:17:45 +08:00
.filter(
ExternalKnowledgeApis.id == external_knowledge_api_id, ExternalKnowledgeApis.tenant_id == dataset.tenant_id
)
2024-09-18 14:36:51 +08:00
.first()
)
2024-08-20 11:13:29 +08:00
2024-09-25 12:37:23 +08:00
if not external_knowledge_api:
2024-09-18 14:36:51 +08:00
logging.info(
click.style(
2024-09-27 16:17:45 +08:00
"Processed external dataset: {} failed, api template: {} not exit.".format(
dataset_id, external_knowledge_api_id
),
2024-09-18 14:36:51 +08:00
fg="red",
)
)
2024-08-20 11:13:29 +08:00
return
2024-08-20 16:18:35 +08:00
files = {}
2024-08-20 11:13:29 +08:00
if data_source["type"] == "upload_file":
2024-09-18 14:36:51 +08:00
upload_file_list = data_source["info_list"]["file_info_list"]["file_ids"]
2024-08-20 11:13:29 +08:00
for file_id in upload_file_list:
2024-09-18 14:36:51 +08:00
file = (
db.session.query(UploadFile)
.filter(UploadFile.tenant_id == dataset.tenant_id, UploadFile.id == file_id)
.first()
)
2024-08-20 11:13:29 +08:00
if file:
2024-08-20 16:18:35 +08:00
files[file.id] = (file.name, storage.load_once(file.key), file.mime_type)
2024-08-20 11:13:29 +08:00
try:
2024-09-27 16:17:45 +08:00
settings = ExternalDatasetService.get_external_knowledge_api_settings(
json.loads(external_knowledge_api.settings)
)
2024-08-20 11:13:29 +08:00
# assemble headers
2024-08-20 16:18:35 +08:00
headers = ExternalDatasetService.assembling_headers(settings.authorization, settings.headers)
2024-08-20 11:13:29 +08:00
# do http request
2024-08-20 16:18:35 +08:00
response = ExternalDatasetService.process_external_api(settings, headers, process_parameter, files)
2024-09-18 14:36:51 +08:00
job_id = response.json().get("job_id")
2024-08-21 16:25:18 +08:00
if job_id:
# save job_id to dataset
dataset.job_id = job_id
db.session.commit()
2024-08-20 16:18:35 +08:00
end_at = time.perf_counter()
logging.info(
2024-09-18 14:36:51 +08:00
click.style(
"Processed external dataset: {} successful, latency: {}".format(dataset.id, end_at - start_at),
fg="green",
)
)
2024-08-20 11:13:29 +08:00
except DocumentIsPausedException as ex:
2024-09-18 14:36:51 +08:00
logging.info(click.style(str(ex), fg="yellow"))
2024-08-20 16:18:35 +08:00
2024-08-20 11:13:29 +08:00
except Exception:
pass