Merge branch 'fix/chore-fix' into dev/plugin-deploy

This commit is contained in:
Yeuoly 2024-12-26 22:30:38 +08:00
commit 673ba9330c
2 changed files with 47 additions and 21 deletions

View File

@ -661,13 +661,14 @@ def migrate_data_for_plugin():
@click.command("extract-plugins", help="Extract plugins.") @click.command("extract-plugins", help="Extract plugins.")
@click.option("--output_file", prompt=True, help="The file to store the extracted plugins.") @click.option("--output_file", prompt=True, help="The file to store the extracted plugins.", default="plugins.jsonl")
def extract_plugins(output_file: str): @click.option("--workers", prompt=True, help="The number of workers to extract plugins.", default=10)
def extract_plugins(output_file: str, workers: int):
""" """
Extract plugins. Extract plugins.
""" """
click.echo(click.style("Starting extract plugins.", fg="white")) click.echo(click.style("Starting extract plugins.", fg="white"))
PluginMigration.extract_plugins(output_file) PluginMigration.extract_plugins(output_file, workers)
click.echo(click.style("Extract plugins completed.", fg="green")) click.echo(click.style("Extract plugins completed.", fg="green"))

View File

@ -4,6 +4,7 @@ import logging
from collections.abc import Sequence from collections.abc import Sequence
import click import click
from flask import Flask, current_app
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from core.agent.entities import AgentToolEntity from core.agent.entities import AgentToolEntity
@ -22,10 +23,13 @@ excluded_providers = ["time", "audio", "code", "webscraper"]
class PluginMigration: class PluginMigration:
@classmethod @classmethod
def extract_plugins(cls, filepath: str) -> None: def extract_plugins(cls, filepath: str, workers: int) -> None:
""" """
Migrate plugin. Migrate plugin.
""" """
import concurrent.futures
from threading import Lock
click.echo(click.style("Migrating models/tools to new plugin Mechanism", fg="white")) click.echo(click.style("Migrating models/tools to new plugin Mechanism", fg="white"))
ended_at = datetime.datetime.now() ended_at = datetime.datetime.now()
started_at = datetime.datetime(2023, 4, 3, 8, 59, 24) started_at = datetime.datetime(2023, 4, 3, 8, 59, 24)
@ -34,9 +38,42 @@ class PluginMigration:
with Session(db.engine) as session: with Session(db.engine) as session:
total_tenant_count = session.query(Tenant.id).count() total_tenant_count = session.query(Tenant.id).count()
click.echo(click.style(f"Total tenant count: {total_tenant_count}", fg="white"))
handled_tenant_count = 0 handled_tenant_count = 0
file_lock = Lock()
counter_lock = Lock()
thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=workers)
def process_tenant(flask_app: Flask, tenant_id: str) -> None:
with flask_app.app_context():
nonlocal handled_tenant_count
try:
plugins = cls.extract_installed_plugin_ids(tenant_id)
# Use lock when writing to file
with file_lock:
with open(filepath, "a") as f:
f.write(json.dumps({"tenant_id": tenant_id, "plugins": plugins}) + "\n")
# Use lock when updating counter
with counter_lock:
nonlocal handled_tenant_count
handled_tenant_count += 1
click.echo(
click.style(
f"[{datetime.datetime.now()}] "
f"Processed {handled_tenant_count} tenants "
f"({(handled_tenant_count / total_tenant_count) * 100:.1f}%), "
f"{handled_tenant_count}/{total_tenant_count}",
fg="green",
)
)
except Exception:
logger.exception(f"Failed to process tenant {tenant_id}")
while current_time < ended_at: while current_time < ended_at:
click.echo(click.style(f"Current time: {current_time}, Started at: {datetime.datetime.now()}", fg="white"))
# Initial interval of 1 day, will be dynamically adjusted based on tenant count # Initial interval of 1 day, will be dynamically adjusted based on tenant count
interval = datetime.timedelta(days=1) interval = datetime.timedelta(days=1)
# Process tenants in this batch # Process tenants in this batch
@ -84,7 +121,6 @@ class PluginMigration:
) )
tenants = [] tenants = []
for row in rs: for row in rs:
tenant_id = str(row.id) tenant_id = str(row.id)
try: try:
@ -93,25 +129,14 @@ class PluginMigration:
logger.exception(f"Failed to process tenant {tenant_id}") logger.exception(f"Failed to process tenant {tenant_id}")
continue continue
for tenant_id in tenants: # Process batch with thread pool
plugins = cls.extract_installed_plugin_ids(tenant_id) thread_pool.map(lambda tenant_id: process_tenant(current_app, tenant_id), tenants)
# append to file, it's a jsonl file
with open(filepath, "a") as f:
f.write(json.dumps({"tenant_id": tenant_id, "plugins": plugins}) + "\n")
handled_tenant_count += len(tenants)
click.echo(
click.style(
f"Processed {handled_tenant_count} tenants "
f"({(handled_tenant_count / total_tenant_count) * 100:.1f}%), "
f"{handled_tenant_count}/{total_tenant_count}",
fg="green",
)
)
current_time = batch_end current_time = batch_end
# wait for all threads to finish
thread_pool.shutdown(wait=True)
@classmethod @classmethod
def extract_installed_plugin_ids(cls, tenant_id: str) -> Sequence[str]: def extract_installed_plugin_ids(cls, tenant_id: str) -> Sequence[str]:
""" """