diff --git a/config/default.py b/config/default.py index ec23e4abc4..459daa33be 100644 --- a/config/default.py +++ b/config/default.py @@ -781,6 +781,7 @@ def monitor_report_config(): CLEAN_EXPIRED_V2_TASK_CRON = env.CLEAN_EXPIRED_V2_TASK_CRON V2_TASK_VALIDITY_DAY = env.V2_TASK_VALIDITY_DAY CLEAN_EXPIRED_V2_TASK_BATCH_NUM = env.CLEAN_EXPIRED_V2_TASK_BATCH_NUM +CLEAN_EXPIRED_V2_TASK_NODE_BATCH_NUM = env.CLEAN_EXPIRED_V2_TASK_NODE_BATCH_NUM CLEAN_EXPIRED_V2_TASK_INSTANCE = env.CLEAN_EXPIRED_V2_TASK_INSTANCE CLEAN_EXPIRED_V2_TASK_CREATE_METHODS = env.CLEAN_EXPIRED_V2_TASK_CREATE_METHODS CLEAN_EXPIRED_V2_TASK_PROJECTS = env.CLEAN_EXPIRED_V2_TASK_PROJECTS diff --git a/env.py b/env.py index d32e5817a8..14c5479526 100644 --- a/env.py +++ b/env.py @@ -92,6 +92,7 @@ CLEAN_EXPIRED_V2_TASK_CRON = tuple(os.getenv("BKAPP_CLEAN_EXPIRED_V2_TASK_CRON", "30 0 * * *").split()) V2_TASK_VALIDITY_DAY = int(os.getenv("BKAPP_V2_TASK_VALIDITY_DAY", 730)) CLEAN_EXPIRED_V2_TASK_BATCH_NUM = int(os.getenv("BKAPP_CLEAN_EXPIRED_V2_TASK_BATCH_NUM", 100)) +CLEAN_EXPIRED_V2_TASK_NODE_BATCH_NUM = int(os.getenv("BKAPP_CLEAN_EXPIRED_V2_TASK_NODE_BATCH_NUM", 5000)) CLEAN_EXPIRED_V2_TASK_INSTANCE = bool(os.getenv("BKAPP_CLEAN_EXPIRED_V2_TASK_INSTANCE", False)) CLEAN_EXPIRED_V2_TASK_CREATE_METHODS = os.getenv("BKAPP_CLEAN_EXPIRED_V2_TASK_CREATE_METHODS", "periodic").split(",") # 没有配置则默认清除所有项目 diff --git a/gcloud/contrib/cleaner/pipeline/bamboo_engine_tasks.py b/gcloud/contrib/cleaner/pipeline/bamboo_engine_tasks.py index 7677c71e5e..4b8f1fa583 100644 --- a/gcloud/contrib/cleaner/pipeline/bamboo_engine_tasks.py +++ b/gcloud/contrib/cleaner/pipeline/bamboo_engine_tasks.py @@ -10,27 +10,31 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -from typing import List, Dict +import logging +from typing import Dict, List +from django.conf import settings from django.db.models import QuerySet - from pipeline.contrib.periodic_task.models import PeriodicTaskHistory from pipeline.eri.models import ( - ContextValue, + CallbackData, ContextOutputs, - Process, - Node, + ContextValue, Data, - State, - ExecutionHistory, ExecutionData, - CallbackData, + ExecutionHistory, + Node, + Process, Schedule, + State, ) +from pipeline.models import PipelineInstance, Snapshot, TreeInfo -from pipeline.models import PipelineInstance, TreeInfo, Snapshot +from gcloud.utils.data_handler import chunk_data from pipeline_web.core.models import NodeInInstance +logger = logging.getLogger("root") + def get_clean_pipeline_instance_data(instance_ids: List[str]) -> Dict[str, QuerySet]: """ @@ -48,19 +52,24 @@ def get_clean_pipeline_instance_data(instance_ids: List[str]) -> Dict[str, Query execution_snapshot = Snapshot.objects.filter(id__in=list(execution_snapshot_ids)) pipeline_ids = instance_ids + logger.info( + f"[get_clean_pipeline_instance_data] fetching pipeline_ids number: {pipeline_ids}, e.x.:{pipeline_ids[:3]}..." + ) context_value = ContextValue.objects.filter(pipeline_id__in=pipeline_ids) context_outputs = ContextOutputs.objects.filter(pipeline_id__in=pipeline_ids) process = Process.objects.filter(root_pipeline_id__in=pipeline_ids) periodic_task_history = PeriodicTaskHistory.objects.filter(pipeline_instance_id__in=pipeline_ids) node_ids = list(nodes_in_pipeline.values_list("node_id", flat=True)) + instance_ids - nodes = Node.objects.filter(node_id__in=node_ids) - data = Data.objects.filter(node_id__in=node_ids) - states = State.objects.filter(node_id__in=node_ids) - execution_history = ExecutionHistory.objects.filter(node_id__in=node_ids) - execution_data = ExecutionData.objects.filter(node_id__in=node_ids) - callback_data = CallbackData.objects.filter(node_id__in=node_ids) - schedules = Schedule.objects.filter(node_id__in=node_ids) + logger.info(f"[get_clean_pipeline_instance_data] fetching node_ids number: {node_ids}, e.x.:{node_ids[:3]}...") + chunk_size = settings.CLEAN_EXPIRED_V2_TASK_NODE_BATCH_NUM + nodes_list = chunk_data(node_ids, chunk_size, lambda x: Node.objects.filter(node_id__in=x)) + data_list = chunk_data(node_ids, chunk_size, lambda x: Data.objects.filter(node_id__in=x)) + states_list = chunk_data(node_ids, chunk_size, lambda x: State.objects.filter(node_id__in=x)) + execution_history_list = chunk_data(node_ids, chunk_size, lambda x: ExecutionHistory.objects.filter(node_id__in=x)) + execution_data_list = chunk_data(node_ids, chunk_size, lambda x: ExecutionData.objects.filter(node_id__in=x)) + callback_data_list = chunk_data(node_ids, chunk_size, lambda x: CallbackData.objects.filter(node_id__in=x)) + schedules_list = chunk_data(node_ids, chunk_size, lambda x: Schedule.objects.filter(node_id__in=x)) return { "tree_info": tree_info, @@ -69,13 +78,13 @@ def get_clean_pipeline_instance_data(instance_ids: List[str]) -> Dict[str, Query "context_value": context_value, "context_outputs": context_outputs, "process": process, - "node": nodes, - "data": data, - "state": states, - "execution_history": execution_history, - "execution_data": execution_data, - "callback_data": callback_data, - "schedules": schedules, "periodic_task_history": periodic_task_history, "pipeline_instances": pipeline_instances, + "node_list": nodes_list, + "data_list": data_list, + "state_list": states_list, + "execution_history_list": execution_history_list, + "execution_data_list": execution_data_list, + "callback_data_list": callback_data_list, + "schedules_list": schedules_list, } diff --git a/gcloud/contrib/cleaner/tasks.py b/gcloud/contrib/cleaner/tasks.py index b0f6c5ee7b..fa8eb5af5f 100644 --- a/gcloud/contrib/cleaner/tasks.py +++ b/gcloud/contrib/cleaner/tasks.py @@ -64,9 +64,13 @@ def clean_expired_v2_task_data(): instance_fields = ["tasks", "pipeline_instances"] with transaction.atomic(): for field, qs in data_to_clean.items(): - if field not in instance_fields or settings.CLEAN_EXPIRED_V2_TASK_INSTANCE: + if field.endswith("_list") and isinstance(qs, list): + logger.info(f"[clean_expired_v2_task_data] clean field: {field}, {len(qs)} batch data") + [q.delete() for q in qs] + elif field not in instance_fields or settings.CLEAN_EXPIRED_V2_TASK_INSTANCE: logger.info( - f"[clean_expired_v2_task_data] clean field: {field}, qs ids: {qs.values_list('id', flat=True)}" + f"[clean_expired_v2_task_data] clean field: {field}, " + f"qs ids: {qs.values_list('id', flat=True)[:10]}..." ) qs.delete() elif field == "pipeline_instances":