Skip to content

Commit

Permalink
fix: 修改数据查询方式 --story=121721246
Browse files Browse the repository at this point in the history
  • Loading branch information
guohelu committed Feb 8, 2025
1 parent 45405f9 commit cc45447
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 18 deletions.
16 changes: 6 additions & 10 deletions gcloud/contrib/cleaner/pipeline/bamboo_engine_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

from gcloud.utils.data_handler import chunk_data
from gcloud.contrib.cleaner.models import ArchivedTaskInstance
from gcloud.taskflow3.models import TaskFlowInstance
from gcloud.taskflow3.models import AutoRetryNodeStrategy, TimeoutNodeConfig
from pipeline_web.core.models import NodeInInstance

Expand Down Expand Up @@ -115,18 +114,15 @@ def get_clean_pipeline_instance_data(instance_ids: List[str]) -> Dict[str, Query
}


def generate_archived_task_instances(expire_pipeline_instance_ids):
def generate_archived_task_instances(tasks):
"""
根据 instance_id 将过期任务的数据进行归档
生成归档任务实例
:param tasks: 待归档的过期任务
:return: List[ArchivedTaskInstance], List[int]
"""
archived_task_instances = []
archived_task_ids = []
try:
tasks = (
TaskFlowInstance.objects.select_related("pipeline_instance")
.filter(pipeline_instance__instance_id__in=expire_pipeline_instance_ids)
.order_by("id")
)
archived_task_instances = []
archived_task_ids = []
for task in tasks:
if task.is_deleted:
continue
Expand Down
14 changes: 6 additions & 8 deletions gcloud/contrib/cleaner/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,15 @@ def archive_expired_v2_task_data():
batch_num = settings.ARCHIVE_EXPIRED_V2_TASK_BATCH_NUM

tasks = (
TaskFlowInstance.objects.filter(
pipeline_instance__create_time__lt=expire_time, engine_ver=2, pipeline_instance__is_expired=1
)
.order_by("id")
.values("id", "pipeline_instance__instance_id")[:batch_num]
TaskFlowInstance.objects.select_related("pipeline_instance")
.filter(pipeline_instance__create_time__lt=expire_time, engine_ver=2, pipeline_instance__is_expired=1)
.order_by("id")[:batch_num]
)
task_ids = [item["id"] for item in tasks]
pipeline_instance_ids = [item["pipeline_instance__instance_id"] for item in tasks]
task_ids = [item.id for item in tasks]
pipeline_instance_ids = [item.pipeline_instance.instance_id for item in tasks]

with transaction.atomic():
archived_task_instances, archived_task_ids = generate_archived_task_instances(pipeline_instance_ids)
archived_task_instances, archived_task_ids = generate_archived_task_instances(tasks)
if archived_task_instances and archived_task_ids:
ArchivedTaskInstance.objects.bulk_create(archived_task_instances)
logger.info(f"[generate_archived_task_instances] generate archived tasks, ids: {archived_task_ids}")
Expand Down

0 comments on commit cc45447

Please sign in to comment.