From fa275b9fdca22e69426a81ac29c9a0e2d7767afa Mon Sep 17 00:00:00 2001 From: elvisliu <719880851@qq.com> Date: Tue, 18 Mar 2025 23:35:04 +0800 Subject: [PATCH] feat: support huawei cloud vector database --- api/configs/middleware/__init__.py | 2 + .../middleware/vdb/huawei_cloud_config.py | 25 ++ api/controllers/console/datasets/datasets.py | 2 + .../rag/datasource/vdb/huawei/__init__.py | 0 .../vdb/huawei/huawei_cloud_vector.py | 215 ++++++++++++++++++ api/core/rag/datasource/vdb/vector_factory.py | 4 + api/core/rag/datasource/vdb/vector_type.py | 1 + docker/.env.example | 5 + docker/docker-compose.yaml | 3 + 9 files changed, 257 insertions(+) create mode 100644 api/configs/middleware/vdb/huawei_cloud_config.py create mode 100644 api/core/rag/datasource/vdb/huawei/__init__.py create mode 100644 api/core/rag/datasource/vdb/huawei/huawei_cloud_vector.py diff --git a/api/configs/middleware/__init__.py b/api/configs/middleware/__init__.py index 3bd638bc74..ff84f34976 100644 --- a/api/configs/middleware/__init__.py +++ b/api/configs/middleware/__init__.py @@ -22,6 +22,7 @@ from .vdb.baidu_vector_config import BaiduVectorDBConfig from .vdb.chroma_config import ChromaConfig from .vdb.couchbase_config import CouchbaseConfig from .vdb.elasticsearch_config import ElasticsearchConfig +from .vdb.huawei_cloud_config import HuaweiCloudConfig from .vdb.lindorm_config import LindormConfig from .vdb.milvus_config import MilvusConfig from .vdb.myscale_config import MyScaleConfig @@ -262,6 +263,7 @@ class MiddlewareConfig( VectorStoreConfig, AnalyticdbConfig, ChromaConfig, + HuaweiCloudConfig, MilvusConfig, MyScaleConfig, OpenSearchConfig, diff --git a/api/configs/middleware/vdb/huawei_cloud_config.py b/api/configs/middleware/vdb/huawei_cloud_config.py new file mode 100644 index 0000000000..2290c60499 --- /dev/null +++ b/api/configs/middleware/vdb/huawei_cloud_config.py @@ -0,0 +1,25 @@ +from typing import Optional + +from pydantic import Field +from pydantic_settings import BaseSettings + + +class HuaweiCloudConfig(BaseSettings): + """ + Configuration settings for Huawei cloud search service + """ + + HUAWEI_CLOUD_HOSTS: Optional[str] = Field( + description="Hostname or IP address of the Huawei cloud search service instance", + default=None, + ) + + HUAWEI_CLOUD_USER: Optional[str] = Field( + description="Username for authenticating with Huawei cloud search service", + default=None, + ) + + HUAWEI_CLOUD_PASSWORD: Optional[str] = Field( + description="Password for authenticating with Huawei cloud search service", + default=None, + ) diff --git a/api/controllers/console/datasets/datasets.py b/api/controllers/console/datasets/datasets.py index 6532e046c9..92bc9fca09 100644 --- a/api/controllers/console/datasets/datasets.py +++ b/api/controllers/console/datasets/datasets.py @@ -664,6 +664,7 @@ class DatasetRetrievalSettingApi(Resource): | VectorType.COUCHBASE | VectorType.MILVUS | VectorType.OPENGAUSS + | VectorType.HUAWEI_CLOUD ): return { "retrieval_method": [ @@ -708,6 +709,7 @@ class DatasetRetrievalSettingMockApi(Resource): | VectorType.PGVECTOR | VectorType.LINDORM | VectorType.OPENGAUSS + | VectorType.HUAWEI_CLOUD ): return { "retrieval_method": [ diff --git a/api/core/rag/datasource/vdb/huawei/__init__.py b/api/core/rag/datasource/vdb/huawei/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/core/rag/datasource/vdb/huawei/huawei_cloud_vector.py b/api/core/rag/datasource/vdb/huawei/huawei_cloud_vector.py new file mode 100644 index 0000000000..b8b1aebb96 --- /dev/null +++ b/api/core/rag/datasource/vdb/huawei/huawei_cloud_vector.py @@ -0,0 +1,215 @@ +import json +import logging +import ssl +from typing import Any, Optional + +from configs import dify_config +from core.rag.datasource.vdb.field import Field +from core.rag.datasource.vdb.vector_base import BaseVector +from core.rag.datasource.vdb.vector_factory import AbstractVectorFactory +from core.rag.datasource.vdb.vector_type import VectorType +from core.rag.embedding.embedding_base import Embeddings +from core.rag.models.document import Document +from elasticsearch import Elasticsearch +from extensions.ext_redis import redis_client +from models.dataset import Dataset +from pydantic import BaseModel, model_validator + +logger = logging.getLogger(__name__) + + +def create_ssl_context() -> ssl.SSLContext: + ssl_context = ssl.create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + return ssl_context + + +class HuaweiCloudVectorConfig(BaseModel): + hosts: str + username: str + password: str + + @model_validator(mode="before") + @classmethod + def validate_config(cls, values: dict) -> dict: + if not values["hosts"]: + raise ValueError("config HOSTS is required") + return values + + def to_elasticsearch_params(self) -> dict[str, Any]: + params = { + "hosts": self.hosts.split(","), + "verify_certs": False, + "ssl_show_warn": False, + } + if self.username and self.password: + params["basic_auth"] = (self.username, self.password) + return params + + +class HuaweiCloudVector(BaseVector): + def __init__(self, index_name: str, config: HuaweiCloudVectorConfig, attributes: list): + super().__init__(index_name.lower()) + self._client = Elasticsearch(**config.to_elasticsearch_params()) + self._attributes = attributes + + def get_type(self) -> str: + return VectorType.HUAWEI_CLOUD + + def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs): + uuids = self._get_uuids(documents) + for i in range(len(documents)): + self._client.index( + index=self._collection_name, + id=uuids[i], + document={ + Field.CONTENT_KEY.value: documents[i].page_content, + Field.VECTOR.value: embeddings[i] or None, + Field.METADATA_KEY.value: documents[i].metadata or {}, + }, + ) + self._client.indices.refresh(index=self._collection_name) + return uuids + + def text_exists(self, id: str) -> bool: + return bool(self._client.exists(index=self._collection_name, id=id)) + + def delete_by_ids(self, ids: list[str]) -> None: + if not ids: + return + for id in ids: + self._client.delete(index=self._collection_name, id=id) + + def delete_by_metadata_field(self, key: str, value: str) -> None: + query_str = {"query": {"match": {f"metadata.{key}": f"{value}"}}} + results = self._client.search(index=self._collection_name, body=query_str) + ids = [hit["_id"] for hit in results["hits"]["hits"]] + if ids: + self.delete_by_ids(ids) + + def delete(self) -> None: + self._client.indices.delete(index=self._collection_name) + + def search_by_vector(self, query_vector: list[float], **kwargs: Any) -> list[Document]: + top_k = kwargs.get("top_k", 4) + + query = { + "size": top_k, + "query": { + "vector": { + Field.VECTOR.value: { + "vector": query_vector, + "topk": top_k, + } + } + } + } + + results = self._client.search(index=self._collection_name, body=query, request_timeout=120) + + docs_and_scores = [] + for hit in results["hits"]["hits"]: + docs_and_scores.append( + ( + Document( + page_content=hit["_source"][Field.CONTENT_KEY.value], + vector=hit["_source"][Field.VECTOR.value], + metadata=hit["_source"][Field.METADATA_KEY.value], + ), + hit["_score"], + ) + ) + + docs = [] + for doc, score in docs_and_scores: + score_threshold = float(kwargs.get("score_threshold") or 0.0) + if score > score_threshold: + if doc.metadata is not None: + doc.metadata["score"] = score + docs.append(doc) + + return docs + + def search_by_full_text(self, query: str, **kwargs: Any) -> list[Document]: + query_str = {"match": {Field.CONTENT_KEY.value: query}} + results = self._client.search(index=self._collection_name, query=query_str, size=kwargs.get("top_k", 4)) + docs = [] + for hit in results["hits"]["hits"]: + docs.append( + Document( + page_content=hit["_source"][Field.CONTENT_KEY.value], + vector=hit["_source"][Field.VECTOR.value], + metadata=hit["_source"][Field.METADATA_KEY.value], + ) + ) + + return docs + + def create(self, texts: list[Document], embeddings: list[list[float]], **kwargs): + metadatas = [d.metadata if d.metadata is not None else {} for d in texts] + self.create_collection(embeddings, metadatas) + self.add_texts(texts, embeddings, **kwargs) + + def create_collection( + self, + embeddings: list[list[float]], + metadatas: Optional[list[dict[Any, Any]]] = None, + index_params: Optional[dict] = None, + ): + lock_name = f"vector_indexing_lock_{self._collection_name}" + with redis_client.lock(lock_name, timeout=20): + collection_exist_cache_key = f"vector_indexing_{self._collection_name}" + if redis_client.get(collection_exist_cache_key): + logger.info(f"Collection {self._collection_name} already exists.") + return + + if not self._client.indices.exists(index=self._collection_name): + dim = len(embeddings[0]) + mappings = { + "properties": { + Field.CONTENT_KEY.value: {"type": "text"}, + Field.VECTOR.value: { # Make sure the dimension is correct here + "type": "vector", + "dimension": dim, + "indexing": True, + "algorithm": "GRAPH", + "metric": "cosine", + "neighbors": 32, + "efc": 128 + }, + Field.METADATA_KEY.value: { + "type": "object", + "properties": { + "doc_id": {"type": "keyword"} # Map doc_id to keyword type + }, + }, + } + } + settings = { + "index.vector": True + } + self._client.indices.create(index=self._collection_name, mappings=mappings, settings=settings) + + redis_client.set(collection_exist_cache_key, 1, ex=3600) + + +class HuaweiCloudVectorFactory(AbstractVectorFactory): + def init_vector(self, dataset: Dataset, attributes: list, embeddings: Embeddings) -> HuaweiCloudVector: + if dataset.index_struct_dict: + class_prefix: str = dataset.index_struct_dict["vector_store"]["class_prefix"] + collection_name = class_prefix.lower() + else: + dataset_id = dataset.id + collection_name = Dataset.gen_collection_name_by_id(dataset_id).lower() + dataset.index_struct = json.dumps(self.gen_index_struct_dict(VectorType.HUAWEI_CLOUD, collection_name)) + + return HuaweiCloudVector( + index_name=collection_name, + config=HuaweiCloudVectorConfig( + hosts=dify_config.HUAWEI_CLOUD_HOSTS, + username=dify_config.HUAWEI_CLOUD_USER, + password=dify_config.HUAWEI_CLOUD_PASSWORD, + ), + attributes=[], + ) diff --git a/api/core/rag/datasource/vdb/vector_factory.py b/api/core/rag/datasource/vdb/vector_factory.py index 6b4d7b0426..b793c034c6 100644 --- a/api/core/rag/datasource/vdb/vector_factory.py +++ b/api/core/rag/datasource/vdb/vector_factory.py @@ -152,6 +152,10 @@ class Vector: from core.rag.datasource.vdb.opengauss.opengauss import OpenGaussFactory return OpenGaussFactory + case VectorType.HUAWEI_CLOUD: + from core.rag.datasource.vdb.huawei.huawei_cloud_vector import HuaweiCloudVectorFactory + + return HuaweiCloudVectorFactory case _: raise ValueError(f"Vector store {vector_type} is not supported.") diff --git a/api/core/rag/datasource/vdb/vector_type.py b/api/core/rag/datasource/vdb/vector_type.py index e06988bfcb..19206821c6 100644 --- a/api/core/rag/datasource/vdb/vector_type.py +++ b/api/core/rag/datasource/vdb/vector_type.py @@ -25,3 +25,4 @@ class VectorType(StrEnum): TIDB_ON_QDRANT = "tidb_on_qdrant" OCEANBASE = "oceanbase" OPENGAUSS = "opengauss" + HUAWEI_CLOUD = "huawei_cloud" diff --git a/docker/.env.example b/docker/.env.example index b295f5cdf0..ce78a42bbf 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -564,6 +564,11 @@ OPENGAUSS_DATABASE=dify OPENGAUSS_MIN_CONNECTION=1 OPENGAUSS_MAX_CONNECTION=5 +# huawei cloud search service vector configurations, only available when VECTOR_STORE is `huawei_cloud` +HUAWEI_CLOUD_HOSTS=https://127.0.0.1:9200 +HUAWEI_CLOUD_USER=admin +HUAWEI_CLOUD_PASSWORD=admin + # Upstash Vector configuration, only available when VECTOR_STORE is `upstash` UPSTASH_VECTOR_URL=https://xxx-vector.upstash.io UPSTASH_VECTOR_TOKEN=dify diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 2b6be77cc3..c1fd116498 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -259,6 +259,9 @@ x-shared-env: &shared-api-worker-env OPENGAUSS_DATABASE: ${OPENGAUSS_DATABASE:-dify} OPENGAUSS_MIN_CONNECTION: ${OPENGAUSS_MIN_CONNECTION:-1} OPENGAUSS_MAX_CONNECTION: ${OPENGAUSS_MAX_CONNECTION:-5} + HUAWEI_CLOUD_HOSTS: ${HUAWEI_CLOUD_HOSTS:-http://127.0.0.1:9200} + HUAWEI_CLOUD_USER: ${HUAWEI_CLOUD_USER:-admin} + HUAWEI_CLOUD_PASSWORD: ${HUAWEI_CLOUD_PASSWORD:-admin} UPSTASH_VECTOR_URL: ${UPSTASH_VECTOR_URL:-https://xxx-vector.upstash.io} UPSTASH_VECTOR_TOKEN: ${UPSTASH_VECTOR_TOKEN:-dify} UPLOAD_FILE_SIZE_LIMIT: ${UPLOAD_FILE_SIZE_LIMIT:-15}