feat: support huawei cloud vector database
This commit is contained in:
parent
ac80c04bd3
commit
fa275b9fdc
@ -22,6 +22,7 @@ from .vdb.baidu_vector_config import BaiduVectorDBConfig
|
||||
from .vdb.chroma_config import ChromaConfig
|
||||
from .vdb.couchbase_config import CouchbaseConfig
|
||||
from .vdb.elasticsearch_config import ElasticsearchConfig
|
||||
from .vdb.huawei_cloud_config import HuaweiCloudConfig
|
||||
from .vdb.lindorm_config import LindormConfig
|
||||
from .vdb.milvus_config import MilvusConfig
|
||||
from .vdb.myscale_config import MyScaleConfig
|
||||
@ -262,6 +263,7 @@ class MiddlewareConfig(
|
||||
VectorStoreConfig,
|
||||
AnalyticdbConfig,
|
||||
ChromaConfig,
|
||||
HuaweiCloudConfig,
|
||||
MilvusConfig,
|
||||
MyScaleConfig,
|
||||
OpenSearchConfig,
|
||||
|
25
api/configs/middleware/vdb/huawei_cloud_config.py
Normal file
25
api/configs/middleware/vdb/huawei_cloud_config.py
Normal file
@ -0,0 +1,25 @@
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import Field
|
||||
from pydantic_settings import BaseSettings
|
||||
|
||||
|
||||
class HuaweiCloudConfig(BaseSettings):
|
||||
"""
|
||||
Configuration settings for Huawei cloud search service
|
||||
"""
|
||||
|
||||
HUAWEI_CLOUD_HOSTS: Optional[str] = Field(
|
||||
description="Hostname or IP address of the Huawei cloud search service instance",
|
||||
default=None,
|
||||
)
|
||||
|
||||
HUAWEI_CLOUD_USER: Optional[str] = Field(
|
||||
description="Username for authenticating with Huawei cloud search service",
|
||||
default=None,
|
||||
)
|
||||
|
||||
HUAWEI_CLOUD_PASSWORD: Optional[str] = Field(
|
||||
description="Password for authenticating with Huawei cloud search service",
|
||||
default=None,
|
||||
)
|
@ -664,6 +664,7 @@ class DatasetRetrievalSettingApi(Resource):
|
||||
| VectorType.COUCHBASE
|
||||
| VectorType.MILVUS
|
||||
| VectorType.OPENGAUSS
|
||||
| VectorType.HUAWEI_CLOUD
|
||||
):
|
||||
return {
|
||||
"retrieval_method": [
|
||||
@ -708,6 +709,7 @@ class DatasetRetrievalSettingMockApi(Resource):
|
||||
| VectorType.PGVECTOR
|
||||
| VectorType.LINDORM
|
||||
| VectorType.OPENGAUSS
|
||||
| VectorType.HUAWEI_CLOUD
|
||||
):
|
||||
return {
|
||||
"retrieval_method": [
|
||||
|
0
api/core/rag/datasource/vdb/huawei/__init__.py
Normal file
0
api/core/rag/datasource/vdb/huawei/__init__.py
Normal file
215
api/core/rag/datasource/vdb/huawei/huawei_cloud_vector.py
Normal file
215
api/core/rag/datasource/vdb/huawei/huawei_cloud_vector.py
Normal file
@ -0,0 +1,215 @@
|
||||
import json
|
||||
import logging
|
||||
import ssl
|
||||
from typing import Any, Optional
|
||||
|
||||
from configs import dify_config
|
||||
from core.rag.datasource.vdb.field import Field
|
||||
from core.rag.datasource.vdb.vector_base import BaseVector
|
||||
from core.rag.datasource.vdb.vector_factory import AbstractVectorFactory
|
||||
from core.rag.datasource.vdb.vector_type import VectorType
|
||||
from core.rag.embedding.embedding_base import Embeddings
|
||||
from core.rag.models.document import Document
|
||||
from elasticsearch import Elasticsearch
|
||||
from extensions.ext_redis import redis_client
|
||||
from models.dataset import Dataset
|
||||
from pydantic import BaseModel, model_validator
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_ssl_context() -> ssl.SSLContext:
|
||||
ssl_context = ssl.create_default_context()
|
||||
ssl_context.check_hostname = False
|
||||
ssl_context.verify_mode = ssl.CERT_NONE
|
||||
return ssl_context
|
||||
|
||||
|
||||
class HuaweiCloudVectorConfig(BaseModel):
|
||||
hosts: str
|
||||
username: str
|
||||
password: str
|
||||
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
def validate_config(cls, values: dict) -> dict:
|
||||
if not values["hosts"]:
|
||||
raise ValueError("config HOSTS is required")
|
||||
return values
|
||||
|
||||
def to_elasticsearch_params(self) -> dict[str, Any]:
|
||||
params = {
|
||||
"hosts": self.hosts.split(","),
|
||||
"verify_certs": False,
|
||||
"ssl_show_warn": False,
|
||||
}
|
||||
if self.username and self.password:
|
||||
params["basic_auth"] = (self.username, self.password)
|
||||
return params
|
||||
|
||||
|
||||
class HuaweiCloudVector(BaseVector):
|
||||
def __init__(self, index_name: str, config: HuaweiCloudVectorConfig, attributes: list):
|
||||
super().__init__(index_name.lower())
|
||||
self._client = Elasticsearch(**config.to_elasticsearch_params())
|
||||
self._attributes = attributes
|
||||
|
||||
def get_type(self) -> str:
|
||||
return VectorType.HUAWEI_CLOUD
|
||||
|
||||
def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs):
|
||||
uuids = self._get_uuids(documents)
|
||||
for i in range(len(documents)):
|
||||
self._client.index(
|
||||
index=self._collection_name,
|
||||
id=uuids[i],
|
||||
document={
|
||||
Field.CONTENT_KEY.value: documents[i].page_content,
|
||||
Field.VECTOR.value: embeddings[i] or None,
|
||||
Field.METADATA_KEY.value: documents[i].metadata or {},
|
||||
},
|
||||
)
|
||||
self._client.indices.refresh(index=self._collection_name)
|
||||
return uuids
|
||||
|
||||
def text_exists(self, id: str) -> bool:
|
||||
return bool(self._client.exists(index=self._collection_name, id=id))
|
||||
|
||||
def delete_by_ids(self, ids: list[str]) -> None:
|
||||
if not ids:
|
||||
return
|
||||
for id in ids:
|
||||
self._client.delete(index=self._collection_name, id=id)
|
||||
|
||||
def delete_by_metadata_field(self, key: str, value: str) -> None:
|
||||
query_str = {"query": {"match": {f"metadata.{key}": f"{value}"}}}
|
||||
results = self._client.search(index=self._collection_name, body=query_str)
|
||||
ids = [hit["_id"] for hit in results["hits"]["hits"]]
|
||||
if ids:
|
||||
self.delete_by_ids(ids)
|
||||
|
||||
def delete(self) -> None:
|
||||
self._client.indices.delete(index=self._collection_name)
|
||||
|
||||
def search_by_vector(self, query_vector: list[float], **kwargs: Any) -> list[Document]:
|
||||
top_k = kwargs.get("top_k", 4)
|
||||
|
||||
query = {
|
||||
"size": top_k,
|
||||
"query": {
|
||||
"vector": {
|
||||
Field.VECTOR.value: {
|
||||
"vector": query_vector,
|
||||
"topk": top_k,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
results = self._client.search(index=self._collection_name, body=query, request_timeout=120)
|
||||
|
||||
docs_and_scores = []
|
||||
for hit in results["hits"]["hits"]:
|
||||
docs_and_scores.append(
|
||||
(
|
||||
Document(
|
||||
page_content=hit["_source"][Field.CONTENT_KEY.value],
|
||||
vector=hit["_source"][Field.VECTOR.value],
|
||||
metadata=hit["_source"][Field.METADATA_KEY.value],
|
||||
),
|
||||
hit["_score"],
|
||||
)
|
||||
)
|
||||
|
||||
docs = []
|
||||
for doc, score in docs_and_scores:
|
||||
score_threshold = float(kwargs.get("score_threshold") or 0.0)
|
||||
if score > score_threshold:
|
||||
if doc.metadata is not None:
|
||||
doc.metadata["score"] = score
|
||||
docs.append(doc)
|
||||
|
||||
return docs
|
||||
|
||||
def search_by_full_text(self, query: str, **kwargs: Any) -> list[Document]:
|
||||
query_str = {"match": {Field.CONTENT_KEY.value: query}}
|
||||
results = self._client.search(index=self._collection_name, query=query_str, size=kwargs.get("top_k", 4))
|
||||
docs = []
|
||||
for hit in results["hits"]["hits"]:
|
||||
docs.append(
|
||||
Document(
|
||||
page_content=hit["_source"][Field.CONTENT_KEY.value],
|
||||
vector=hit["_source"][Field.VECTOR.value],
|
||||
metadata=hit["_source"][Field.METADATA_KEY.value],
|
||||
)
|
||||
)
|
||||
|
||||
return docs
|
||||
|
||||
def create(self, texts: list[Document], embeddings: list[list[float]], **kwargs):
|
||||
metadatas = [d.metadata if d.metadata is not None else {} for d in texts]
|
||||
self.create_collection(embeddings, metadatas)
|
||||
self.add_texts(texts, embeddings, **kwargs)
|
||||
|
||||
def create_collection(
|
||||
self,
|
||||
embeddings: list[list[float]],
|
||||
metadatas: Optional[list[dict[Any, Any]]] = None,
|
||||
index_params: Optional[dict] = None,
|
||||
):
|
||||
lock_name = f"vector_indexing_lock_{self._collection_name}"
|
||||
with redis_client.lock(lock_name, timeout=20):
|
||||
collection_exist_cache_key = f"vector_indexing_{self._collection_name}"
|
||||
if redis_client.get(collection_exist_cache_key):
|
||||
logger.info(f"Collection {self._collection_name} already exists.")
|
||||
return
|
||||
|
||||
if not self._client.indices.exists(index=self._collection_name):
|
||||
dim = len(embeddings[0])
|
||||
mappings = {
|
||||
"properties": {
|
||||
Field.CONTENT_KEY.value: {"type": "text"},
|
||||
Field.VECTOR.value: { # Make sure the dimension is correct here
|
||||
"type": "vector",
|
||||
"dimension": dim,
|
||||
"indexing": True,
|
||||
"algorithm": "GRAPH",
|
||||
"metric": "cosine",
|
||||
"neighbors": 32,
|
||||
"efc": 128
|
||||
},
|
||||
Field.METADATA_KEY.value: {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"doc_id": {"type": "keyword"} # Map doc_id to keyword type
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
settings = {
|
||||
"index.vector": True
|
||||
}
|
||||
self._client.indices.create(index=self._collection_name, mappings=mappings, settings=settings)
|
||||
|
||||
redis_client.set(collection_exist_cache_key, 1, ex=3600)
|
||||
|
||||
|
||||
class HuaweiCloudVectorFactory(AbstractVectorFactory):
|
||||
def init_vector(self, dataset: Dataset, attributes: list, embeddings: Embeddings) -> HuaweiCloudVector:
|
||||
if dataset.index_struct_dict:
|
||||
class_prefix: str = dataset.index_struct_dict["vector_store"]["class_prefix"]
|
||||
collection_name = class_prefix.lower()
|
||||
else:
|
||||
dataset_id = dataset.id
|
||||
collection_name = Dataset.gen_collection_name_by_id(dataset_id).lower()
|
||||
dataset.index_struct = json.dumps(self.gen_index_struct_dict(VectorType.HUAWEI_CLOUD, collection_name))
|
||||
|
||||
return HuaweiCloudVector(
|
||||
index_name=collection_name,
|
||||
config=HuaweiCloudVectorConfig(
|
||||
hosts=dify_config.HUAWEI_CLOUD_HOSTS,
|
||||
username=dify_config.HUAWEI_CLOUD_USER,
|
||||
password=dify_config.HUAWEI_CLOUD_PASSWORD,
|
||||
),
|
||||
attributes=[],
|
||||
)
|
@ -152,6 +152,10 @@ class Vector:
|
||||
from core.rag.datasource.vdb.opengauss.opengauss import OpenGaussFactory
|
||||
|
||||
return OpenGaussFactory
|
||||
case VectorType.HUAWEI_CLOUD:
|
||||
from core.rag.datasource.vdb.huawei.huawei_cloud_vector import HuaweiCloudVectorFactory
|
||||
|
||||
return HuaweiCloudVectorFactory
|
||||
case _:
|
||||
raise ValueError(f"Vector store {vector_type} is not supported.")
|
||||
|
||||
|
@ -25,3 +25,4 @@ class VectorType(StrEnum):
|
||||
TIDB_ON_QDRANT = "tidb_on_qdrant"
|
||||
OCEANBASE = "oceanbase"
|
||||
OPENGAUSS = "opengauss"
|
||||
HUAWEI_CLOUD = "huawei_cloud"
|
||||
|
@ -564,6 +564,11 @@ OPENGAUSS_DATABASE=dify
|
||||
OPENGAUSS_MIN_CONNECTION=1
|
||||
OPENGAUSS_MAX_CONNECTION=5
|
||||
|
||||
# huawei cloud search service vector configurations, only available when VECTOR_STORE is `huawei_cloud`
|
||||
HUAWEI_CLOUD_HOSTS=https://127.0.0.1:9200
|
||||
HUAWEI_CLOUD_USER=admin
|
||||
HUAWEI_CLOUD_PASSWORD=admin
|
||||
|
||||
# Upstash Vector configuration, only available when VECTOR_STORE is `upstash`
|
||||
UPSTASH_VECTOR_URL=https://xxx-vector.upstash.io
|
||||
UPSTASH_VECTOR_TOKEN=dify
|
||||
|
@ -259,6 +259,9 @@ x-shared-env: &shared-api-worker-env
|
||||
OPENGAUSS_DATABASE: ${OPENGAUSS_DATABASE:-dify}
|
||||
OPENGAUSS_MIN_CONNECTION: ${OPENGAUSS_MIN_CONNECTION:-1}
|
||||
OPENGAUSS_MAX_CONNECTION: ${OPENGAUSS_MAX_CONNECTION:-5}
|
||||
HUAWEI_CLOUD_HOSTS: ${HUAWEI_CLOUD_HOSTS:-http://127.0.0.1:9200}
|
||||
HUAWEI_CLOUD_USER: ${HUAWEI_CLOUD_USER:-admin}
|
||||
HUAWEI_CLOUD_PASSWORD: ${HUAWEI_CLOUD_PASSWORD:-admin}
|
||||
UPSTASH_VECTOR_URL: ${UPSTASH_VECTOR_URL:-https://xxx-vector.upstash.io}
|
||||
UPSTASH_VECTOR_TOKEN: ${UPSTASH_VECTOR_TOKEN:-dify}
|
||||
UPLOAD_FILE_SIZE_LIMIT: ${UPLOAD_FILE_SIZE_LIMIT:-15}
|
||||
|
Loading…
Reference in New Issue
Block a user