dify/api/core/plugin/backwards_invocation/base.py
2024-09-26 17:22:39 +08:00

37 lines
1.3 KiB
Python

import json
from collections.abc import Generator
from typing import Generic, Optional, TypeVar
from pydantic import BaseModel
class BaseBackwardsInvocation:
@classmethod
def convert_to_event_stream(cls, response: Generator[BaseModel | dict | str, None, None] | BaseModel | dict):
if isinstance(response, Generator):
try:
for chunk in response:
if isinstance(chunk, BaseModel):
yield BaseBackwardsInvocationResponse(data=chunk).model_dump_json().encode() + b"\n\n"
elif isinstance(chunk, str):
yield f"event: {chunk}\n\n".encode()
else:
yield json.dumps(chunk).encode() + b"\n\n"
except Exception as e:
error_message = BaseBackwardsInvocationResponse(error=str(e)).model_dump_json()
yield f"{error_message}\n\n".encode()
else:
if isinstance(response, BaseModel):
yield response.model_dump_json().encode() + b"\n\n"
else:
yield json.dumps(response).encode() + b"\n\n"
T = TypeVar("T", bound=BaseModel | dict | str | bool | int)
class BaseBackwardsInvocationResponse(BaseModel, Generic[T]):
data: Optional[T] = None
error: str = ""