dify/api/core/tools/custom_tool/provider.py
2024-09-20 23:48:48 +08:00

177 lines
6.5 KiB
Python

from pydantic import Field
from core.entities.provider_entities import ProviderConfig
from core.tools.__base.tool_provider import ToolProviderController
from core.tools.custom_tool.tool import ApiTool
from core.tools.entities.common_entities import I18nObject
from core.tools.entities.tool_bundle import ApiToolBundle
from core.tools.entities.tool_entities import (
ApiProviderAuthType,
ToolProviderEntity,
ToolProviderIdentity,
ToolProviderType,
)
from extensions.ext_database import db
from models.tools import ApiToolProvider
class ApiToolProviderController(ToolProviderController):
provider_id: str
tenant_id: str
tools: list[ApiTool] = Field(default_factory=list)
def __init__(self, entity: ToolProviderEntity, provider_id: str, tenant_id: str) -> None:
super().__init__(entity)
self.provider_id = provider_id
self.tenant_id = tenant_id
@staticmethod
def from_db(db_provider: ApiToolProvider, auth_type: ApiProviderAuthType) -> "ApiToolProviderController":
credentials_schema = {
"auth_type": ProviderConfig(
name="auth_type",
required=True,
type=ProviderConfig.Type.SELECT,
options=[
ProviderConfig.Option(value="none", label=I18nObject(en_US="None", zh_Hans="")),
ProviderConfig.Option(value="api_key", label=I18nObject(en_US="api_key", zh_Hans="api_key")),
],
default="none",
help=I18nObject(en_US="The auth type of the api provider", zh_Hans="api provider 的认证类型"),
)
}
if auth_type == ApiProviderAuthType.API_KEY:
credentials_schema = {
**credentials_schema,
"api_key_header": ProviderConfig(
name="api_key_header",
required=False,
default="api_key",
type=ProviderConfig.Type.TEXT_INPUT,
help=I18nObject(en_US="The header name of the api key", zh_Hans="携带 api key 的 header 名称"),
),
"api_key_value": ProviderConfig(
name="api_key_value",
required=True,
type=ProviderConfig.Type.SECRET_INPUT,
help=I18nObject(en_US="The api key", zh_Hans="api key的值"),
),
"api_key_header_prefix": ProviderConfig(
name="api_key_header_prefix",
required=False,
default="basic",
type=ProviderConfig.Type.SELECT,
help=I18nObject(en_US="The prefix of the api key header", zh_Hans="api key header 的前缀"),
options=[
ProviderConfig.Option(value="basic", label=I18nObject(en_US="Basic", zh_Hans="Basic")),
ProviderConfig.Option(value="bearer", label=I18nObject(en_US="Bearer", zh_Hans="Bearer")),
ProviderConfig.Option(value="custom", label=I18nObject(en_US="Custom", zh_Hans="Custom")),
],
),
}
elif auth_type == ApiProviderAuthType.NONE:
pass
user = db_provider.user
user_name = user.name if user else ""
return ApiToolProviderController(
entity=ToolProviderEntity(
identity=ToolProviderIdentity(
author=user_name,
name=db_provider.name,
label=I18nObject(en_US=db_provider.name, zh_Hans=db_provider.name),
description=I18nObject(en_US=db_provider.description, zh_Hans=db_provider.description),
icon=db_provider.icon,
),
credentials_schema=credentials_schema,
),
provider_id=db_provider.id or "",
tenant_id=db_provider.tenant_id or "",
)
@property
def provider_type(self) -> ToolProviderType:
return ToolProviderType.API
def _parse_tool_bundle(self, tool_bundle: ApiToolBundle) -> ApiTool:
"""
parse tool bundle to tool
:param tool_bundle: the tool bundle
:return: the tool
"""
return ApiTool(
**{
"api_bundle": tool_bundle,
"identity": {
"author": tool_bundle.author,
"name": tool_bundle.operation_id,
"label": {"en_US": tool_bundle.operation_id, "zh_Hans": tool_bundle.operation_id},
"icon": self.entity.identity.icon,
"provider": self.provider_id,
},
"description": {
"human": {"en_US": tool_bundle.summary or "", "zh_Hans": tool_bundle.summary or ""},
"llm": tool_bundle.summary or "",
},
"parameters": tool_bundle.parameters or [],
}
)
def load_bundled_tools(self, tools: list[ApiToolBundle]) -> list[ApiTool]:
"""
load bundled tools
:param tools: the bundled tools
:return: the tools
"""
self.tools = [self._parse_tool_bundle(tool) for tool in tools]
return self.tools
def get_tools(self, tenant_id: str) -> list[ApiTool]:
"""
fetch tools from database
:param user_id: the user id
:param tenant_id: the tenant id
:return: the tools
"""
if self.tools is not None:
return self.tools
tools: list[ApiTool] = []
# get tenant api providers
db_providers: list[ApiToolProvider] = (
db.session.query(ApiToolProvider)
.filter(ApiToolProvider.tenant_id == tenant_id, ApiToolProvider.name == self.entity.identity.name)
.all()
)
if db_providers and len(db_providers) != 0:
for db_provider in db_providers:
for tool in db_provider.tools:
assistant_tool = self._parse_tool_bundle(tool)
tools.append(assistant_tool)
self.tools = tools
return tools
def get_tool(self, tool_name: str) -> ApiTool:
"""
get tool by name
:param tool_name: the name of the tool
:return: the tool
"""
if self.tools is None:
self.get_tools(self.tenant_id)
for tool in self.tools:
if tool.entity.identity.name == tool_name:
return tool
raise ValueError(f"tool {tool_name} not found")