From 80a94dd390b794560d39af91442562022f9584af Mon Sep 17 00:00:00 2001 From: Bowen Liang Date: Wed, 25 Dec 2024 18:01:28 +0800 Subject: [PATCH] update --- api/configs/feature/__init__.py | 5 ++ api/schedule/clean_messages.py | 90 +++++++++++++++++++-------------- 2 files changed, 57 insertions(+), 38 deletions(-) diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index 74cdf944865796..bb70426de421e9 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -645,6 +645,11 @@ class DataSetConfig(BaseSettings): default=30, ) + PLAN_SANDBOX_CLEAN_MESSAGE_BATCH_SIZE: PositiveInt = Field( + description="Batch size for message cleanup operations", + default=200, + ) + class WorkspaceConfig(BaseSettings): """ diff --git a/api/schedule/clean_messages.py b/api/schedule/clean_messages.py index 48bdc872f41e5c..85f25f0784847b 100644 --- a/api/schedule/clean_messages.py +++ b/api/schedule/clean_messages.py @@ -1,4 +1,5 @@ import datetime +import logging import time import click @@ -25,58 +26,71 @@ def clean_messages(): click.echo(click.style("Start clean messages.", fg="green")) start_at = time.perf_counter() - plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta( + upper_message_time = datetime.datetime.now() - datetime.timedelta( days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING ) - page = 1 + deleted_messages_count = 0 while True: try: # Main query with join and filter # FIXME:for mypy no paginate method error messages = ( db.session.query(Message) # type: ignore - .filter(Message.created_at < plan_sandbox_clean_message_day) - .order_by(Message.created_at.desc()) - .limit(100) + .filter(Message.created_at < upper_message_time) + .order_by(Message.created_at.asc()) + .limit(dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_BATCH_SIZE) .all() ) - except NotFound: break if not messages: break + for message in messages: - plan_sandbox_clean_message_day = message.created_at - app = App.query.filter_by(id=message.app_id).first() - features_cache_key = f"features:{app.tenant_id}" - plan_cache = redis_client.get(features_cache_key) - if plan_cache is None: - features = FeatureService.get_features(app.tenant_id) - redis_client.setex(features_cache_key, 600, features.billing.subscription.plan) - plan = features.billing.subscription.plan - else: - plan = plan_cache.decode() + plan = determine_plan(message) if plan == "sandbox": - # clean related message - db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message.id).delete( - synchronize_session=False - ) - db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message.id).delete( - synchronize_session=False - ) - db.session.query(MessageChain).filter(MessageChain.message_id == message.id).delete( - synchronize_session=False - ) - db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message.id).delete( - synchronize_session=False - ) - db.session.query(MessageFile).filter(MessageFile.message_id == message.id).delete( - synchronize_session=False - ) - db.session.query(SavedMessage).filter(SavedMessage.message_id == message.id).delete( - synchronize_session=False - ) - db.session.query(Message).filter(Message.id == message.id).delete() - db.session.commit() + is_delete_success = delete_single_message(message.id) + if is_delete_success: + deleted_messages_count = deleted_messages_count + 1 end_at = time.perf_counter() - click.echo(click.style("Cleaned unused dataset from db success latency: {}".format(end_at - start_at), fg="green")) + click.echo( + click.style( + f"Cleaned outdated messages from db success latency: {end_at - start_at}," + f" deleted messages count: {deleted_messages_count}", + fg="green", + ) + ) + + +def determine_plan(message) -> str: + app = App.query.filter_by(id=message.app_id).first() + features_cache_key = f"features:{app.tenant_id}" + plan_cache = redis_client.get(features_cache_key) + if plan_cache is None: + features = FeatureService.get_features(app.tenant_id) + redis_client.setex(features_cache_key, 600, features.billing.subscription.plan) + return features.billing.subscription.plan + else: + return plan_cache.decode() + + +def delete_single_message(message_id: str): + try: + db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message_id).delete( + synchronize_session=False + ) + db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message_id).delete( + synchronize_session=False + ) + db.session.query(MessageChain).filter(MessageChain.message_id == message_id).delete(synchronize_session=False) + db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message_id).delete( + synchronize_session=False + ) + db.session.query(MessageFile).filter(MessageFile.message_id == message_id).delete(synchronize_session=False) + db.session.query(SavedMessage).filter(SavedMessage.message_id == message_id).delete(synchronize_session=False) + db.session.query(Message).filter(Message.id == message_id).delete() + db.session.commit() + return True + except Exception: + logging.exception(f"Failed to delete message {message_id}") + return False