Compare commits
7 Commits
main
...
fix/redis-
Author | SHA1 | Date | |
---|---|---|---|
![]() |
0c6ad1df64 | ||
![]() |
40fb522f56 | ||
![]() |
96d9951d5c | ||
![]() |
d36201f7ff | ||
![]() |
b46c7935b1 | ||
![]() |
206e6e1e7c | ||
![]() |
8685c0d48b |
1
.github/workflows/build-push.yml
vendored
1
.github/workflows/build-push.yml
vendored
@ -5,6 +5,7 @@ on:
|
||||
branches:
|
||||
- "main"
|
||||
- "deploy/dev"
|
||||
- "fix/redis-slow-in-gevent"
|
||||
release:
|
||||
types: [published]
|
||||
|
||||
|
5
api/configs/middleware/cache/redis_config.py
vendored
5
api/configs/middleware/cache/redis_config.py
vendored
@ -34,6 +34,11 @@ class RedisConfig(BaseSettings):
|
||||
default=0,
|
||||
)
|
||||
|
||||
REDIS_MAX_CONNECTIONS: PositiveInt = Field(
|
||||
description="Maximum number of connections to Redis",
|
||||
default=200,
|
||||
)
|
||||
|
||||
REDIS_USE_SSL: bool = Field(
|
||||
description="Enable SSL/TLS for the Redis connection",
|
||||
default=False,
|
||||
|
@ -226,13 +226,11 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
|
||||
is_first_conversation = True
|
||||
|
||||
# init generate records
|
||||
(conversation, message) = self._init_generate_records(application_generate_entity, conversation)
|
||||
|
||||
if is_first_conversation:
|
||||
# update conversation features
|
||||
conversation.override_model_configs = workflow.features
|
||||
db.session.commit()
|
||||
db.session.refresh(conversation)
|
||||
(conversation, message) = self._init_generate_records(
|
||||
application_generate_entity=application_generate_entity,
|
||||
conversation=conversation,
|
||||
override_model_configs=workflow.features_dict if is_first_conversation else None,
|
||||
)
|
||||
|
||||
# init queue manager
|
||||
queue_manager = MessageBasedAppQueueManager(
|
||||
|
@ -1,4 +1,5 @@
|
||||
import logging
|
||||
import time
|
||||
from collections.abc import Mapping
|
||||
from typing import Any, cast
|
||||
|
||||
@ -101,6 +102,9 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
|
||||
):
|
||||
return
|
||||
|
||||
# trace start time
|
||||
start_time = time.perf_counter()
|
||||
|
||||
# Init conversation variables
|
||||
stmt = select(ConversationVariable).where(
|
||||
ConversationVariable.app_id == self.conversation.app_id,
|
||||
@ -128,6 +132,13 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
|
||||
conversation_dialogue_count = self.conversation.dialogue_count
|
||||
db.session.commit()
|
||||
|
||||
# trace end time
|
||||
end_time = time.perf_counter()
|
||||
print(f"conversation_dialogue_count time: {end_time - start_time}")
|
||||
|
||||
# trace start time
|
||||
start_time = time.perf_counter()
|
||||
|
||||
# Create a variable pool.
|
||||
system_inputs = {
|
||||
SystemVariableKey.QUERY: query,
|
||||
@ -151,6 +162,10 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
|
||||
# init graph
|
||||
graph = self._init_graph(graph_config=workflow.graph_dict)
|
||||
|
||||
# trace end time
|
||||
end_time = time.perf_counter()
|
||||
print(f"init graph time: {end_time - start_time}")
|
||||
|
||||
db.session.close()
|
||||
|
||||
# RUN WORKFLOW
|
||||
|
@ -15,7 +15,6 @@ from core.app.entities.queue_entities import (
|
||||
QueuePingEvent,
|
||||
QueueStopEvent,
|
||||
)
|
||||
from extensions.ext_redis import redis_client
|
||||
|
||||
|
||||
class PublishFrom(Enum):
|
||||
@ -32,10 +31,10 @@ class AppQueueManager:
|
||||
self._user_id = user_id
|
||||
self._invoke_from = invoke_from
|
||||
|
||||
user_prefix = "account" if self._invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user"
|
||||
redis_client.setex(
|
||||
AppQueueManager._generate_task_belong_cache_key(self._task_id), 1800, f"{user_prefix}-{self._user_id}"
|
||||
)
|
||||
# user_prefix = "account" if self._invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user"
|
||||
# redis_client.setex(
|
||||
# AppQueueManager._generate_task_belong_cache_key(self._task_id), 1800, f"{user_prefix}-{self._user_id}"
|
||||
# )
|
||||
|
||||
q = queue.Queue()
|
||||
|
||||
@ -114,26 +113,27 @@ class AppQueueManager:
|
||||
Set task stop flag
|
||||
:return:
|
||||
"""
|
||||
result = redis_client.get(cls._generate_task_belong_cache_key(task_id))
|
||||
if result is None:
|
||||
return
|
||||
return
|
||||
# result = redis_client.get(cls._generate_task_belong_cache_key(task_id))
|
||||
# if result is None:
|
||||
# return
|
||||
|
||||
user_prefix = "account" if invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user"
|
||||
if result.decode("utf-8") != f"{user_prefix}-{user_id}":
|
||||
return
|
||||
# user_prefix = "account" if invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user"
|
||||
# if result.decode("utf-8") != f"{user_prefix}-{user_id}":
|
||||
# return
|
||||
|
||||
stopped_cache_key = cls._generate_stopped_cache_key(task_id)
|
||||
redis_client.setex(stopped_cache_key, 600, 1)
|
||||
# stopped_cache_key = cls._generate_stopped_cache_key(task_id)
|
||||
# redis_client.setex(stopped_cache_key, 600, 1)
|
||||
|
||||
def _is_stopped(self) -> bool:
|
||||
"""
|
||||
Check if task is stopped
|
||||
:return:
|
||||
"""
|
||||
stopped_cache_key = AppQueueManager._generate_stopped_cache_key(self._task_id)
|
||||
result = redis_client.get(stopped_cache_key)
|
||||
if result is not None:
|
||||
return True
|
||||
# stopped_cache_key = AppQueueManager._generate_stopped_cache_key(self._task_id)
|
||||
# result = redis_client.get(stopped_cache_key)
|
||||
# if result is not None:
|
||||
# return True
|
||||
|
||||
return False
|
||||
|
||||
|
@ -1,8 +1,9 @@
|
||||
import json
|
||||
import logging
|
||||
from collections.abc import Generator
|
||||
import uuid
|
||||
from collections.abc import Generator, Mapping
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional, Union
|
||||
from typing import Any, Optional, Union
|
||||
|
||||
from sqlalchemy import and_
|
||||
|
||||
@ -137,6 +138,7 @@ class MessageBasedAppGenerator(BaseAppGenerator):
|
||||
AdvancedChatAppGenerateEntity,
|
||||
],
|
||||
conversation: Optional[Conversation] = None,
|
||||
override_model_configs: Optional[Mapping[str, Any]] = None,
|
||||
) -> tuple[Conversation, Message]:
|
||||
"""
|
||||
Initialize generate records
|
||||
@ -158,14 +160,12 @@ class MessageBasedAppGenerator(BaseAppGenerator):
|
||||
|
||||
if isinstance(application_generate_entity, AdvancedChatAppGenerateEntity):
|
||||
app_model_config_id = None
|
||||
override_model_configs = None
|
||||
model_provider = None
|
||||
model_id = None
|
||||
else:
|
||||
app_model_config_id = app_config.app_model_config_id
|
||||
model_provider = application_generate_entity.model_conf.provider
|
||||
model_id = application_generate_entity.model_conf.model
|
||||
override_model_configs = None
|
||||
if app_config.app_model_config_from == EasyUIBasedAppModelConfigFrom.ARGS and app_config.app_mode in {
|
||||
AppMode.AGENT_CHAT,
|
||||
AppMode.CHAT,
|
||||
@ -177,61 +177,63 @@ class MessageBasedAppGenerator(BaseAppGenerator):
|
||||
introduction = self._get_conversation_introduction(application_generate_entity)
|
||||
|
||||
if not conversation:
|
||||
conversation = Conversation(
|
||||
app_id=app_config.app_id,
|
||||
app_model_config_id=app_model_config_id,
|
||||
model_provider=model_provider,
|
||||
model_id=model_id,
|
||||
override_model_configs=json.dumps(override_model_configs) if override_model_configs else None,
|
||||
mode=app_config.app_mode.value,
|
||||
name="New conversation",
|
||||
inputs=application_generate_entity.inputs,
|
||||
introduction=introduction,
|
||||
system_instruction="",
|
||||
system_instruction_tokens=0,
|
||||
status="normal",
|
||||
invoke_from=application_generate_entity.invoke_from.value,
|
||||
from_source=from_source,
|
||||
from_end_user_id=end_user_id,
|
||||
from_account_id=account_id,
|
||||
)
|
||||
with db.Session(bind=db.engine, expire_on_commit=False) as session:
|
||||
conversation = Conversation()
|
||||
conversation.id = str(uuid.uuid4())
|
||||
conversation.app_id = app_config.app_id
|
||||
conversation.app_model_config_id = app_model_config_id
|
||||
conversation.model_provider = model_provider
|
||||
conversation.model_id = model_id
|
||||
conversation.override_model_configs = (
|
||||
json.dumps(override_model_configs) if override_model_configs else None
|
||||
)
|
||||
conversation.mode = app_config.app_mode.value
|
||||
conversation.name = "New conversation"
|
||||
conversation.inputs = application_generate_entity.inputs
|
||||
conversation.introduction = introduction
|
||||
conversation.system_instruction = ""
|
||||
conversation.system_instruction_tokens = 0
|
||||
conversation.status = "normal"
|
||||
conversation.invoke_from = application_generate_entity.invoke_from.value
|
||||
conversation.from_source = from_source
|
||||
conversation.from_end_user_id = end_user_id
|
||||
conversation.from_account_id = account_id
|
||||
|
||||
db.session.add(conversation)
|
||||
db.session.commit()
|
||||
db.session.refresh(conversation)
|
||||
session.add(conversation)
|
||||
session.commit()
|
||||
session.refresh(conversation)
|
||||
else:
|
||||
conversation.updated_at = datetime.now(timezone.utc).replace(tzinfo=None)
|
||||
db.session.commit()
|
||||
|
||||
message = Message(
|
||||
app_id=app_config.app_id,
|
||||
model_provider=model_provider,
|
||||
model_id=model_id,
|
||||
override_model_configs=json.dumps(override_model_configs) if override_model_configs else None,
|
||||
conversation_id=conversation.id,
|
||||
inputs=application_generate_entity.inputs,
|
||||
query=application_generate_entity.query or "",
|
||||
message="",
|
||||
message_tokens=0,
|
||||
message_unit_price=0,
|
||||
message_price_unit=0,
|
||||
answer="",
|
||||
answer_tokens=0,
|
||||
answer_unit_price=0,
|
||||
answer_price_unit=0,
|
||||
parent_message_id=getattr(application_generate_entity, "parent_message_id", None),
|
||||
provider_response_latency=0,
|
||||
total_price=0,
|
||||
currency="USD",
|
||||
invoke_from=application_generate_entity.invoke_from.value,
|
||||
from_source=from_source,
|
||||
from_end_user_id=end_user_id,
|
||||
from_account_id=account_id,
|
||||
)
|
||||
with db.Session(bind=db.engine, expire_on_commit=False) as session:
|
||||
message = Message()
|
||||
message.app_id = app_config.app_id
|
||||
message.model_provider = model_provider
|
||||
message.model_id = model_id
|
||||
message.override_model_configs = json.dumps(override_model_configs) if override_model_configs else None
|
||||
message.conversation_id = conversation.id
|
||||
message.inputs = application_generate_entity.inputs
|
||||
message.query = application_generate_entity.query or ""
|
||||
message.message = ""
|
||||
message.message_tokens = 0
|
||||
message.message_unit_price = 0
|
||||
message.answer = ""
|
||||
message.answer_tokens = 0
|
||||
message.answer_unit_price = 0
|
||||
message.answer_price_unit = 0
|
||||
message.parent_message_id = getattr(application_generate_entity, "parent_message_id", None)
|
||||
message.provider_response_latency = 0
|
||||
message.total_price = 0
|
||||
message.currency = "USD"
|
||||
message.invoke_from = application_generate_entity.invoke_from.value
|
||||
message.from_source = from_source
|
||||
message.from_end_user_id = end_user_id
|
||||
message.from_account_id = account_id
|
||||
|
||||
db.session.add(message)
|
||||
db.session.commit()
|
||||
db.session.refresh(message)
|
||||
session.add(message)
|
||||
session.commit()
|
||||
session.refresh(message)
|
||||
|
||||
for file in application_generate_entity.files:
|
||||
message_file = MessageFile(
|
||||
|
Loading…
Reference in New Issue
Block a user