Skip to content

Commit

Permalink
fix(mysql): spider扩容单据修复 #9301
Browse files Browse the repository at this point in the history
  • Loading branch information
yksitu authored and iSecloud committed Feb 14, 2025
1 parent 4d1b03c commit ed11c8f
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
init_machine_sub_flow,
update_machine_system_info_flow,
)
from backend.flow.engine.bamboo.scene.spider.common.exceptions import AddSpiderNodeFailedException
from backend.flow.plugins.components.collections.common.delete_cc_service_instance import DelCCServiceInstComponent
from backend.flow.plugins.components.collections.common.download_backup_client import DownloadBackupClientComponent
from backend.flow.plugins.components.collections.mysql.clear_machine import MySQLClearMachineComponent
Expand Down Expand Up @@ -69,6 +70,7 @@ def add_spider_slaves_sub_flow(
add_spider_slaves: list,
root_id: str,
parent_global_data: dict,
is_clone_user: bool = True,
):
"""
定义对原有的TenDB cluster集群添加spider slave节点的公共子流程
Expand All @@ -79,11 +81,23 @@ def add_spider_slaves_sub_flow(
@param root_id: flow流程的root_id
@param parent_global_data: 本次子流程的对应上层流程的全局只读上下文
@param uid: 单据id
@param is_clone_user 是否克隆权限
"""
tdbctl_pass = get_random_string(length=10)

# 获取到集群对应的spider端口,作为这次的安装
parent_global_data["spider_ports"] = [cluster.proxyinstance_set.first().port]
# 获取模板spider,先更加spider_slave获取,如果不存在,则根据spider_master获取
spiders = cluster.proxyinstance_set.filter(
status=InstanceStatus.RUNNING, tendbclusterspiderext__spider_role=TenDBClusterSpiderRole.SPIDER_SLAVE
)
if spiders:
tmp_spider = spiders[0]
else:
tmp_spider = cluster.proxyinstance_set.filter(
status=InstanceStatus.RUNNING, tendbclusterspiderext__spider_role=TenDBClusterSpiderRole.SPIDER_MASTER
)[0]

parent_global_data["spider_ports"] = [tmp_spider.port]
parent_global_data["db_module_id"] = cluster.db_module_id
sub_pipeline = SubBuilder(root_id=root_id, data=parent_global_data)

Expand Down Expand Up @@ -151,32 +165,30 @@ def add_spider_slaves_sub_flow(
)
sub_pipeline.add_parallel_acts(acts_list=acts_list)

# 阶段5 集群的业务账号信息克隆到新的spider实例上, 因为目前spider中控实例无法有克隆权限的操作,只能在这里做
acts_list = []

# 这里获取集群内running状态的spider节点作为这次克隆权限的依据
tmp_spider = cluster.proxyinstance_set.filter(status=InstanceStatus.RUNNING)[0]

for spider in add_spider_slaves:
acts_list.append(
{
"act_name": _("克隆权限到spider节点[{}]".format(spider["ip"])),
"act_component_code": CloneUserComponent.code,
"kwargs": asdict(
InstanceUserCloneKwargs(
clone_data=[
{
"source": tmp_spider.ip_port,
"target": f"{spider['ip']}{AUTH_ADDRESS_DIVIDER}{tmp_spider.port}",
"machine_type": MachineType.SPIDER.value,
"bk_cloud_id": cluster.bk_cloud_id,
},
],
)
),
}
)
sub_pipeline.add_parallel_acts(acts_list=acts_list)
if is_clone_user:
# 阶段5 集群的业务账号信息克隆到新的spider实例上, 因为目前spider中控实例无法有克隆权限的操作,只能在这里做
# 如果是slave集群部署,则不需要克隆权限, 应该is_clone_user=False
acts_list = []
for spider in add_spider_slaves:
acts_list.append(
{
"act_name": _("克隆权限到spider节点[{}->{}]".format(tmp_spider.machine.ip, spider["ip"])),
"act_component_code": CloneUserComponent.code,
"kwargs": asdict(
InstanceUserCloneKwargs(
clone_data=[
{
"source": tmp_spider.ip_port,
"target": f"{spider['ip']}{AUTH_ADDRESS_DIVIDER}{tmp_spider.port}",
"machine_type": MachineType.SPIDER.value,
"bk_cloud_id": cluster.bk_cloud_id,
},
],
)
),
}
)
sub_pipeline.add_parallel_acts(acts_list=acts_list)

# 阶段6 下发actor,并执行spider-slave 路由初始化
sub_pipeline.add_act(
Expand Down Expand Up @@ -317,38 +329,50 @@ def add_spider_masters_sub_flow(
)
sub_pipeline.add_parallel_acts(acts_list=acts_list)

# 阶段6 集群的业务账号信息克隆到新的spider实例上, 因为目前spider中控实例无法有克隆权限的操作,只能在这里做
acts_list = []

# 这里获取集群内running状态的spider节点作为这次克隆权限的依据
tmp_spider = cluster.proxyinstance_set.filter(status=InstanceStatus.RUNNING)[0]

for spider in add_spider_masters:
acts_list.append(
{
"act_name": _("克隆权限到spider节点[{}]".format(spider["ip"])),
"act_component_code": CloneUserComponent.code,
"kwargs": asdict(
InstanceUserCloneKwargs(
clone_data=[
{
"source": tmp_spider.ip_port,
"target": f"{spider['ip']}{AUTH_ADDRESS_DIVIDER}{tmp_spider.port}",
"machine_type": MachineType.SPIDER.value,
"bk_cloud_id": cluster.bk_cloud_id,
},
],
)
),
}
)
sub_pipeline.add_parallel_acts(acts_list=acts_list)

# 阶段7 执行spider-master 路由初始化, 内置账号密码随机生成,不需要平台来维护,避免误操作影响
if is_add_spider_mnt:
role = TenDBClusterSpiderRole.SPIDER_MNT.value
else:
role = TenDBClusterSpiderRole.SPIDER_MASTER.value

spiders = cluster.proxyinstance_set.filter(status=InstanceStatus.RUNNING, tendbclusterspiderext__spider_role=role)
if not spiders and role == TenDBClusterSpiderRole.SPIDER_MNT.value:
# 如果添加节点的角色是spider_mnt, 但是集群没有相应的角色,作为第一次添加,不做权限克隆
is_clone_user = False
tmp_spider = None
elif spiders:
# 正常获取第一个节点做模板
is_clone_user = True
tmp_spider = spiders[0]
else:
# 如果spiders没有返回,则构建流程失败
raise AddSpiderNodeFailedException(message=_("构建流程失败,集群找不到相应角色[{}]的spider实例作为权限克隆的来源".format(role)))

if is_clone_user:
# 阶段6 集群的业务账号信息克隆到新的spider实例上, 因为目前spider中控实例无法有克隆权限的操作,只能在这里做
acts_list = []
for spider in add_spider_masters:
acts_list.append(
{
"act_name": _("克隆权限到spider节点[{}->{}]".format(tmp_spider.machine.ip, spider["ip"])),
"act_component_code": CloneUserComponent.code,
"kwargs": asdict(
InstanceUserCloneKwargs(
clone_data=[
{
"source": tmp_spider.ip_port,
"target": f"{spider['ip']}{AUTH_ADDRESS_DIVIDER}{tmp_spider.port}",
"machine_type": MachineType.SPIDER.value,
"bk_cloud_id": cluster.bk_cloud_id,
},
],
)
),
}
)
sub_pipeline.add_parallel_acts(acts_list=acts_list)

# 阶段7 执行spider-master 路由初始化, 内置账号密码随机生成,不需要平台来维护,避免误操作影响
sub_pipeline.add_act(
act_name=_("添加对应路由关系"),
act_component_code=AddSpiderRoutingComponent.code,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def deploy_slave_cluster(self):
uid=self.data["uid"],
parent_global_data=copy.deepcopy(sub_flow_context),
slave_domain=info["slave_domain"],
is_clone_user=False,
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,15 @@ def _check_node_is_add(self, spider_ip: str, spider_port: int, cluster: Cluster)
def _exec_create_node(self, cluster: Cluster, user: str, passwd: str, spider_ip: str, spider_port: int, tag: str):
"""
定义通过中控master添加node的公共方法
因为添加节点时候,需要导出导入表结构,如果碰到多表集群容易超时,所以timeout设置12小时。
"""
cmds = ["set tc_admin=1"]
rpc_params = {
"addresses": [cluster.tendbcluster_ctl_primary_address()],
"cmds": cmds,
"force": False,
"bk_cloud_id": cluster.bk_cloud_id,
"query_timeout": 43200,
}

if not self._check_node_is_add(cluster=cluster, spider_ip=spider_ip, spider_port=spider_port):
Expand Down

0 comments on commit ed11c8f

Please sign in to comment.