Compare commits

...

7 Commits

Author SHA1 Message Date
takatost
0c6ad1df64 optimize conversation / message insert 2024-10-25 00:31:15 -07:00
takatost
40fb522f56 add trace 2024-10-24 02:34:10 -07:00
takatost
96d9951d5c disable redis cache 2024-10-24 02:03:29 -07:00
takatost
d36201f7ff remove unused code 2024-10-24 01:16:32 -07:00
takatost
b46c7935b1 test disable socket patch 2024-10-24 01:12:04 -07:00
takatost
206e6e1e7c add debug print 2024-10-24 00:54:10 -07:00
takatost
8685c0d48b test replace gevent socket patch in redis 2024-10-24 00:11:45 -07:00
6 changed files with 98 additions and 77 deletions

View File

@ -5,6 +5,7 @@ on:
branches:
- "main"
- "deploy/dev"
- "fix/redis-slow-in-gevent"
release:
types: [published]

View File

@ -34,6 +34,11 @@ class RedisConfig(BaseSettings):
default=0,
)
REDIS_MAX_CONNECTIONS: PositiveInt = Field(
description="Maximum number of connections to Redis",
default=200,
)
REDIS_USE_SSL: bool = Field(
description="Enable SSL/TLS for the Redis connection",
default=False,

View File

@ -226,13 +226,11 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
is_first_conversation = True
# init generate records
(conversation, message) = self._init_generate_records(application_generate_entity, conversation)
if is_first_conversation:
# update conversation features
conversation.override_model_configs = workflow.features
db.session.commit()
db.session.refresh(conversation)
(conversation, message) = self._init_generate_records(
application_generate_entity=application_generate_entity,
conversation=conversation,
override_model_configs=workflow.features_dict if is_first_conversation else None,
)
# init queue manager
queue_manager = MessageBasedAppQueueManager(

View File

@ -1,4 +1,5 @@
import logging
import time
from collections.abc import Mapping
from typing import Any, cast
@ -101,6 +102,9 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
):
return
# trace start time
start_time = time.perf_counter()
# Init conversation variables
stmt = select(ConversationVariable).where(
ConversationVariable.app_id == self.conversation.app_id,
@ -128,6 +132,13 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
conversation_dialogue_count = self.conversation.dialogue_count
db.session.commit()
# trace end time
end_time = time.perf_counter()
print(f"conversation_dialogue_count time: {end_time - start_time}")
# trace start time
start_time = time.perf_counter()
# Create a variable pool.
system_inputs = {
SystemVariableKey.QUERY: query,
@ -151,6 +162,10 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
# init graph
graph = self._init_graph(graph_config=workflow.graph_dict)
# trace end time
end_time = time.perf_counter()
print(f"init graph time: {end_time - start_time}")
db.session.close()
# RUN WORKFLOW

View File

@ -15,7 +15,6 @@ from core.app.entities.queue_entities import (
QueuePingEvent,
QueueStopEvent,
)
from extensions.ext_redis import redis_client
class PublishFrom(Enum):
@ -32,10 +31,10 @@ class AppQueueManager:
self._user_id = user_id
self._invoke_from = invoke_from
user_prefix = "account" if self._invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user"
redis_client.setex(
AppQueueManager._generate_task_belong_cache_key(self._task_id), 1800, f"{user_prefix}-{self._user_id}"
)
# user_prefix = "account" if self._invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user"
# redis_client.setex(
# AppQueueManager._generate_task_belong_cache_key(self._task_id), 1800, f"{user_prefix}-{self._user_id}"
# )
q = queue.Queue()
@ -114,26 +113,27 @@ class AppQueueManager:
Set task stop flag
:return:
"""
result = redis_client.get(cls._generate_task_belong_cache_key(task_id))
if result is None:
return
return
# result = redis_client.get(cls._generate_task_belong_cache_key(task_id))
# if result is None:
# return
user_prefix = "account" if invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user"
if result.decode("utf-8") != f"{user_prefix}-{user_id}":
return
# user_prefix = "account" if invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user"
# if result.decode("utf-8") != f"{user_prefix}-{user_id}":
# return
stopped_cache_key = cls._generate_stopped_cache_key(task_id)
redis_client.setex(stopped_cache_key, 600, 1)
# stopped_cache_key = cls._generate_stopped_cache_key(task_id)
# redis_client.setex(stopped_cache_key, 600, 1)
def _is_stopped(self) -> bool:
"""
Check if task is stopped
:return:
"""
stopped_cache_key = AppQueueManager._generate_stopped_cache_key(self._task_id)
result = redis_client.get(stopped_cache_key)
if result is not None:
return True
# stopped_cache_key = AppQueueManager._generate_stopped_cache_key(self._task_id)
# result = redis_client.get(stopped_cache_key)
# if result is not None:
# return True
return False

View File

@ -1,8 +1,9 @@
import json
import logging
from collections.abc import Generator
import uuid
from collections.abc import Generator, Mapping
from datetime import datetime, timezone
from typing import Optional, Union
from typing import Any, Optional, Union
from sqlalchemy import and_
@ -137,6 +138,7 @@ class MessageBasedAppGenerator(BaseAppGenerator):
AdvancedChatAppGenerateEntity,
],
conversation: Optional[Conversation] = None,
override_model_configs: Optional[Mapping[str, Any]] = None,
) -> tuple[Conversation, Message]:
"""
Initialize generate records
@ -158,14 +160,12 @@ class MessageBasedAppGenerator(BaseAppGenerator):
if isinstance(application_generate_entity, AdvancedChatAppGenerateEntity):
app_model_config_id = None
override_model_configs = None
model_provider = None
model_id = None
else:
app_model_config_id = app_config.app_model_config_id
model_provider = application_generate_entity.model_conf.provider
model_id = application_generate_entity.model_conf.model
override_model_configs = None
if app_config.app_model_config_from == EasyUIBasedAppModelConfigFrom.ARGS and app_config.app_mode in {
AppMode.AGENT_CHAT,
AppMode.CHAT,
@ -177,61 +177,63 @@ class MessageBasedAppGenerator(BaseAppGenerator):
introduction = self._get_conversation_introduction(application_generate_entity)
if not conversation:
conversation = Conversation(
app_id=app_config.app_id,
app_model_config_id=app_model_config_id,
model_provider=model_provider,
model_id=model_id,
override_model_configs=json.dumps(override_model_configs) if override_model_configs else None,
mode=app_config.app_mode.value,
name="New conversation",
inputs=application_generate_entity.inputs,
introduction=introduction,
system_instruction="",
system_instruction_tokens=0,
status="normal",
invoke_from=application_generate_entity.invoke_from.value,
from_source=from_source,
from_end_user_id=end_user_id,
from_account_id=account_id,
)
with db.Session(bind=db.engine, expire_on_commit=False) as session:
conversation = Conversation()
conversation.id = str(uuid.uuid4())
conversation.app_id = app_config.app_id
conversation.app_model_config_id = app_model_config_id
conversation.model_provider = model_provider
conversation.model_id = model_id
conversation.override_model_configs = (
json.dumps(override_model_configs) if override_model_configs else None
)
conversation.mode = app_config.app_mode.value
conversation.name = "New conversation"
conversation.inputs = application_generate_entity.inputs
conversation.introduction = introduction
conversation.system_instruction = ""
conversation.system_instruction_tokens = 0
conversation.status = "normal"
conversation.invoke_from = application_generate_entity.invoke_from.value
conversation.from_source = from_source
conversation.from_end_user_id = end_user_id
conversation.from_account_id = account_id
db.session.add(conversation)
db.session.commit()
db.session.refresh(conversation)
session.add(conversation)
session.commit()
session.refresh(conversation)
else:
conversation.updated_at = datetime.now(timezone.utc).replace(tzinfo=None)
db.session.commit()
message = Message(
app_id=app_config.app_id,
model_provider=model_provider,
model_id=model_id,
override_model_configs=json.dumps(override_model_configs) if override_model_configs else None,
conversation_id=conversation.id,
inputs=application_generate_entity.inputs,
query=application_generate_entity.query or "",
message="",
message_tokens=0,
message_unit_price=0,
message_price_unit=0,
answer="",
answer_tokens=0,
answer_unit_price=0,
answer_price_unit=0,
parent_message_id=getattr(application_generate_entity, "parent_message_id", None),
provider_response_latency=0,
total_price=0,
currency="USD",
invoke_from=application_generate_entity.invoke_from.value,
from_source=from_source,
from_end_user_id=end_user_id,
from_account_id=account_id,
)
with db.Session(bind=db.engine, expire_on_commit=False) as session:
message = Message()
message.app_id = app_config.app_id
message.model_provider = model_provider
message.model_id = model_id
message.override_model_configs = json.dumps(override_model_configs) if override_model_configs else None
message.conversation_id = conversation.id
message.inputs = application_generate_entity.inputs
message.query = application_generate_entity.query or ""
message.message = ""
message.message_tokens = 0
message.message_unit_price = 0
message.answer = ""
message.answer_tokens = 0
message.answer_unit_price = 0
message.answer_price_unit = 0
message.parent_message_id = getattr(application_generate_entity, "parent_message_id", None)
message.provider_response_latency = 0
message.total_price = 0
message.currency = "USD"
message.invoke_from = application_generate_entity.invoke_from.value
message.from_source = from_source
message.from_end_user_id = end_user_id
message.from_account_id = account_id
db.session.add(message)
db.session.commit()
db.session.refresh(message)
session.add(message)
session.commit()
session.refresh(message)
for file in application_generate_entity.files:
message_file = MessageFile(