Skip to content

Commit

Permalink
Merge branch 'master' into feat/master_field_alias/#1010158081121057197
Browse files Browse the repository at this point in the history
  • Loading branch information
zzz833708 authored Feb 6, 2025
2 parents 732c4f3 + 987977b commit 9907911
Show file tree
Hide file tree
Showing 212 changed files with 10,592 additions and 1,369 deletions.
14 changes: 14 additions & 0 deletions bklog/apps/log_databus/handlers/collector_scenario/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,20 @@ def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT)
"""
raise NotImplementedError()

@staticmethod
def get_unique_field_list(field_list: list, target_fields: list, sort_fields: list):
"""
获取唯一字段列表
:param field_list: 字段列表
:param target_fields: 定位字段
:param sort_fields: 排序字段
"""
if target_fields:
field_list.extend(target_fields)
if sort_fields:
field_list.extend(sort_fields)
return sorted(set(field_list))

@classmethod
def change_data_stream(
cls, collector_config: CollectorConfig, mq_topic: Optional[str] = None, mq_partition: int = 1
Expand Down
26 changes: 16 additions & 10 deletions bklog/apps/log_databus/handlers/collector_scenario/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,33 @@
We undertake not to change the open source license (MIT license) applicable to the current version of
the project delivered to anyone in the future.
"""
from django.utils.translation import ugettext as _

from apps.log_databus.constants import EtlConfig
from apps.log_databus.handlers.collector_scenario.base import CollectorScenario
from django.utils.translation import ugettext as _


class CustomCollectorScenario(CollectorScenario):
@classmethod
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT):
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT, **kwargs):
"""
获取采集器标准字段
"""
unique_field_list = cls.get_unique_field_list(
field_list=[
"cloudId",
"serverIp",
"path",
"gseIndex",
"iterationIndex",
"dtEventTimeStamp",
],
target_fields=kwargs.get("target_fields"),
sort_fields=kwargs.get("sort_fields"),
)
return {
"option": {
"es_unique_field_list": [
"cloudId",
"serverIp",
"path",
"gseIndex",
"iterationIndex",
"dtEventTimeStamp",
],
"es_unique_field_list": unique_field_list,
"separator_node_source": "",
"separator_node_action": "",
"separator_node_name": "",
Expand Down
23 changes: 14 additions & 9 deletions bklog/apps/log_databus/handlers/collector_scenario/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,20 +224,25 @@ def parse_steps(cls, steps):
return params

@classmethod
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT):
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT, **kwargs):
"""
获取采集器标准字段
"""
unique_field_list = cls.get_unique_field_list(
field_list=[
"cloudId",
"serverIp",
"gseIndex",
"iterationIndex",
"bk_host_id",
"dtEventTimeStamp",
],
target_fields=kwargs.get("target_fields"),
sort_fields=kwargs.get("sort_fields"),
)
return {
"option": {
"es_unique_field_list": [
"cloudId",
"serverIp",
"gseIndex",
"iterationIndex",
"bk_host_id",
"dtEventTimeStamp",
],
"es_unique_field_list": unique_field_list,
"separator_node_source": "",
"separator_node_action": "",
"separator_node_name": "",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,20 +106,25 @@ def parse_steps(cls, steps):
return {"redis_hosts": []}

@classmethod
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT):
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT, **kwargs):
"""
获取采集器标准字段
"""
unique_field_list = cls.get_unique_field_list(
field_list=[
"cloudId",
"serverIp",
"gseIndex",
"iterationIndex",
"bk_host_id",
"dtEventTimeStamp",
],
target_fields=kwargs.get("target_fields"),
sort_fields=kwargs.get("sort_fields"),
)
return {
"option": {
"es_unique_field_list": [
"cloudId",
"serverIp",
"gseIndex",
"iterationIndex",
"bk_host_id",
"dtEventTimeStamp",
],
"es_unique_field_list": unique_field_list,
"separator_node_source": "",
"separator_node_action": "",
"separator_node_name": "",
Expand Down
25 changes: 15 additions & 10 deletions bklog/apps/log_databus/handlers/collector_scenario/row.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,21 +210,26 @@ def parse_steps(cls, steps):
return params

@classmethod
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT):
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT, **kwargs):
"""
获取采集器标准字段
"""
unique_field_list = cls.get_unique_field_list(
field_list=[
"cloudId",
"serverIp",
"path",
"gseIndex",
"iterationIndex",
"bk_host_id",
"dtEventTimeStamp",
],
target_fields=kwargs.get("target_fields"),
sort_fields=kwargs.get("sort_fields"),
)
return {
"option": {
"es_unique_field_list": [
"cloudId",
"serverIp",
"path",
"gseIndex",
"iterationIndex",
"bk_host_id",
"dtEventTimeStamp",
],
"es_unique_field_list": unique_field_list,
"separator_node_source": "",
"separator_node_action": "",
"separator_node_name": "",
Expand Down
25 changes: 15 additions & 10 deletions bklog/apps/log_databus/handlers/collector_scenario/section.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,21 +219,26 @@ def parse_steps(cls, steps):
return params

@classmethod
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT):
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT, **kwargs):
"""
获取采集器标准字段
"""
unique_field_list = cls.get_unique_field_list(
field_list=[
"cloudId",
"serverIp",
"path",
"gseIndex",
"iterationIndex",
"bk_host_id",
"dtEventTimeStamp",
],
target_fields=kwargs.get("target_fields"),
sort_fields=kwargs.get("sort_fields"),
)
return {
"option": {
"es_unique_field_list": [
"cloudId",
"serverIp",
"path",
"gseIndex",
"iterationIndex",
"bk_host_id",
"dtEventTimeStamp",
],
"es_unique_field_list": unique_field_list,
"separator_node_source": "",
"separator_node_action": "",
"separator_node_name": "",
Expand Down
25 changes: 14 additions & 11 deletions bklog/apps/log_databus/handlers/collector_scenario/syslog.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
We undertake not to change the open source license (MIT license) applicable to the current version of
the project delivered to anyone in the future.
"""
from collections import OrderedDict

from django.utils.translation import ugettext as _

from apps.log_databus.constants import EtlConfig, LogPluginInfo, PluginParamLogicOpEnum
Expand Down Expand Up @@ -126,20 +124,25 @@ def parse_steps(cls, steps):
return {"syslog_protocol": "", "syslog_port": 0, "syslog_conditions": []}

@classmethod
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT):
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT, **kwargs):
"""
获取采集器标准字段
"""
unique_field_list = cls.get_unique_field_list(
field_list=[
"cloudId",
"serverIp",
"gseIndex",
"iterationIndex",
"bk_host_id",
"dtEventTimeStamp",
],
target_fields=kwargs.get("target_fields"),
sort_fields=kwargs.get("sort_fields"),
)
built_in_config = {
"option": {
"es_unique_field_list": [
"cloudId",
"serverIp",
"gseIndex",
"iterationIndex",
"bk_host_id",
"dtEventTimeStamp",
],
"es_unique_field_list": unique_field_list,
"separator_node_source": "",
"separator_node_action": "",
"separator_node_name": "",
Expand Down
25 changes: 15 additions & 10 deletions bklog/apps/log_databus/handlers/collector_scenario/wineventlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,21 +123,26 @@ def parse_steps(cls, steps):
}

@classmethod
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT):
def get_built_in_config(cls, es_version="5.X", etl_config=EtlConfig.BK_LOG_TEXT, **kwargs):
"""
获取采集器标准字段
"""
unique_field_list = cls.get_unique_field_list(
field_list=[
"cloudId",
"serverIp",
"winEventId",
"winEventChannel",
"winEventRecordId",
"bk_host_id",
"dtEventTimeStamp",
],
target_fields=kwargs.get("target_fields"),
sort_fields=kwargs.get("sort_fields"),
)
return {
"option": {
"es_unique_field_list": [
"cloudId",
"serverIp",
"winEventId",
"winEventChannel",
"winEventRecordId",
"bk_host_id",
"dtEventTimeStamp",
],
"es_unique_field_list": unique_field_list,
"separator_node_source": "",
"separator_node_action": "",
"separator_node_name": "",
Expand Down
7 changes: 7 additions & 0 deletions bklog/apps/log_databus/handlers/etl/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ def update_or_create(
# raise CollectorResultTableIDDuplicateException(
# CollectorResultTableIDDuplicateException.MESSAGE.format(result_table_id=table_id)
# )
index_set_obj = LogIndexSet.objects.filter(index_set_id=self.data.index_set_id).first()
if sort_fields is None and index_set_obj:
sort_fields = index_set_obj.sort_fields
if sort_fields is None and index_set_obj:
target_fields = index_set_obj.target_fields

# 1. meta-创建/修改结果表
etl_storage = EtlStorage.get_instance(etl_config=etl_config)
Expand All @@ -124,6 +129,8 @@ def update_or_create(
es_version=cluster_info["cluster_config"]["version"],
hot_warm_config=cluster_info["cluster_config"].get("custom_option", {}).get("hot_warm_config"),
es_shards=es_shards,
sort_fields=sort_fields,
target_fields=target_fields,
alias_settings=alias_settings,
)

Expand Down
12 changes: 11 additions & 1 deletion bklog/apps/log_databus/handlers/etl_storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,8 @@ def update_or_create_result_table(
hot_warm_config: dict = None,
es_shards: int = settings.ES_SHARDS,
index_settings: dict = None,
sort_fields: list = None,
target_fields: list = None,
alias_settings: list = None,
):
"""
Expand All @@ -442,6 +444,9 @@ def update_or_create_result_table(
:param es_shards: es分片数
:param index_settings: 索引配置
:param alias_settings: 别名配置
:param sort_fields: 排序字段
:param target_fields: 定位字段
:param alias_settings: 别名配置
"""
from apps.log_databus.handlers.collector import build_result_table_id

Expand Down Expand Up @@ -544,7 +549,12 @@ def update_or_create_result_table(

# 获取清洗配置
collector_scenario = CollectorScenario.get_instance(collector_scenario_id=instance.collector_scenario_id)
built_in_config = collector_scenario.get_built_in_config(es_version, self.etl_config)
built_in_config = collector_scenario.get_built_in_config(
es_version,
self.etl_config,
sort_fields=sort_fields,
target_fields=target_fields,
)
result_table_config = self.get_result_table_config(fields, etl_params, built_in_config, es_version=es_version)

# 添加元数据路径配置到结果表配置中
Expand Down
5 changes: 5 additions & 0 deletions bklog/apps/log_search/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,11 @@ class PreCheckAsyncExportException(BaseException):
MESSAGE = _("创建异步导出任务前置检查失败,请检查索引集字段配置")


class BKBaseExportException(BaseException):
ERROR_CODE = "505"
MESSAGE = _("计算平台索引集暂不支持快速下载")


# =================================================
# JWT
# =================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@
IndexSetType,
)
from apps.log_search.exceptions import (
BKBaseExportException,
MissAsyncExportException,
PreCheckAsyncExportException,
)
from apps.log_search.handlers.search.search_handlers_esquery import SearchHandler
from apps.log_search.models import AsyncTask, LogIndexSet
from apps.log_search.models import AsyncTask, LogIndexSet, Scenario
from apps.log_search.tasks.async_export import async_export
from apps.models import model_to_dict
from apps.utils.db import array_chunk
Expand Down Expand Up @@ -84,6 +85,9 @@ def __init__(
self.export_file_type = export_file_type

def async_export(self, is_quick_export: bool = False):
# 计算平台暂不支持快速下载
if is_quick_export and self.search_handler.scenario_id == Scenario.BKDATA:
raise BKBaseExportException()
# 判断fields是否支持
fields = self._pre_check_fields()
# 获取排序字段
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1822,6 +1822,11 @@ def _init_indices_str(self, index_set_id: int) -> str:
)
)
self.origin_indices = ",".join(index_list)
self.custom_indices = self.search_dict.get("custom_indices")
if self.custom_indices and index_list:
self.origin_indices = ",".join(
_index for _index in self.custom_indices.split(",") if _index in index_list
)
self.origin_scenario_id = tmp_index_obj.scenario_id
for addition in self.search_dict.get("addition", []):
# 查询条件中包含__dist_xx 则查询聚类结果表:xxx_bklog_xxx_clustered
Expand Down
Loading

0 comments on commit 9907911

Please sign in to comment.