Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support remove node #69

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions conf/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,5 @@ storageGroupValueLimit=200.0

# 是否允许通过环境变量设置参数
enableEnvParameter=false

migrationThreadPoolSize=20
28 changes: 28 additions & 0 deletions core/src/main/java/cn/edu/tsinghua/iginx/IginxWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import cn.edu.tsinghua.iginx.metadata.DefaultMetaManager;
import cn.edu.tsinghua.iginx.metadata.IMetaManager;
import cn.edu.tsinghua.iginx.metadata.entity.*;
import cn.edu.tsinghua.iginx.migration.storage.StorageMigrationExecutor;
import cn.edu.tsinghua.iginx.resource.QueryResourceManager;
import cn.edu.tsinghua.iginx.thrift.*;
import cn.edu.tsinghua.iginx.transform.exec.TransformJobManager;
Expand Down Expand Up @@ -258,6 +259,33 @@ public Status removeHistoryDataSource(RemoveHistoryDataSourceReq req) {
return status;
}

@Override
public Status removeStorageEngine(RemoveStorageEngineReq req) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

移除节点&迁移数据一般都是个比较长的过程,是否应该考虑用户重复发请求的情况呢

if (!sessionManager.checkSession(req.getSessionId(), AuthType.Cluster)) {
return RpcUtils.ACCESS_DENY;
}
long storageId = req.getStorageId();
StorageEngineMeta storageEngine = metaManager.getStorageEngine(storageId);
if (storageEngine == null) {
Status status = new Status(StatusCode.STATEMENT_EXECUTION_ERROR.getStatusCode());
status.setMessage("storage engine is not exists.");
return status;
}
try {
if (StorageMigrationExecutor.getInstance().migration(storageId, req.sync, true)) {
return RpcUtils.SUCCESS;
}
Status status = new Status(StatusCode.STATEMENT_EXECUTION_ERROR.getStatusCode());
status.setMessage("unexpected error during storage migration");
return status;
} catch (Exception e) {
logger.error("unexpected error during storage migration: ", e);
Status status = new Status(StatusCode.STATEMENT_EXECUTION_ERROR.getStatusCode());
status.setMessage("unexpected error during storage migration: " + e.getMessage());
return status;
}
}

@Override
public Status addStorageEngines(AddStorageEnginesReq req) {
if (!sessionManager.checkSession(req.getSessionId(), AuthType.Cluster)) {
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/cn/edu/tsinghua/iginx/conf/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ public class Config {

private int streamParallelGroupByWorkerNum = 5;

private int migrationThreadPoolSize = 20;

public int getMaxTimeseriesLength() {
return maxTimeseriesLength;
}
Expand Down Expand Up @@ -855,4 +857,12 @@ public int getStreamParallelGroupByWorkerNum() {
public void setStreamParallelGroupByWorkerNum(int streamParallelGroupByWorkerNum) {
this.streamParallelGroupByWorkerNum = streamParallelGroupByWorkerNum;
}

public int getMigrationThreadPoolSize() {
return migrationThreadPoolSize;
}

public void setMigrationThreadPoolSize(int migrationThreadPoolSize) {
this.migrationThreadPoolSize = migrationThreadPoolSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ private void loadPropsFromFile() {
config.setStreamParallelGroupByWorkerNum(
Integer.parseInt(
properties.getProperty("streamParallelGroupByWorkerNum", "5")));
config.setMigrationThreadPoolSize(
Integer.parseInt(properties.getProperty("migrationThreadPoolSize", "20")));
} catch (IOException e) {
logger.error("Fail to load properties: ", e);
}
Expand Down Expand Up @@ -361,6 +363,8 @@ private void loadPropsFromEnv() {
EnvUtils.loadEnv(
"streamParallelGroupByWorkerNum",
config.getStreamParallelGroupByWorkerNum()));
config.setMigrationThreadPoolSize(
EnvUtils.loadEnv("migrationThreadPoolSize", config.getMigrationThreadPoolSize()));
}

private void loadUDFListFromFile() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import cn.edu.tsinghua.iginx.metadata.storage.etcd.ETCDMetaStorage;
import cn.edu.tsinghua.iginx.metadata.storage.zk.ZooKeeperMetaStorage;
import cn.edu.tsinghua.iginx.metadata.utils.ReshardStatus;
import cn.edu.tsinghua.iginx.migration.storage.StorageMigrationPlan;
import cn.edu.tsinghua.iginx.monitor.HotSpotMonitor;
import cn.edu.tsinghua.iginx.monitor.RequestsMonitor;
import cn.edu.tsinghua.iginx.policy.simple.ColumnCalDO;
Expand Down Expand Up @@ -439,6 +440,286 @@ public boolean addStorageEngines(List<StorageEngineMeta> storageEngineMetas) {
return false;
}

@Override
public boolean storeMigrationPlan(StorageMigrationPlan plan) {
return storage.storeMigrationPlan(plan);
}

@Override
public List<StorageMigrationPlan> scanStorageMigrationPlan() {
return storage.scanStorageMigrationPlan();
}

@Override
public StorageMigrationPlan getStorageMigrationPlan(long storageId) {
return storage.getStorageMigrationPlan(storageId);
}

@Override
public boolean transferMigrationPlan(long id, long from, long to) {
return storage.transferMigrationPlan(id, from, to);
}

@Override
public boolean deleteMigrationPlan(long id) {
return storage.deleteMigrationPlan(id);
}

@Override
public Map<String, String> startMigrationStorageUnits(
Map<String, Long> migrationMap, boolean migrationData) {
if (migrationData) {
return startMigrationStorageUnitsWithData(migrationMap);
}
return startMigrationStorageUnitsWithoutData(migrationMap);
}

private Map<String, String> startMigrationStorageUnitsWithData(Map<String, Long> migrationMap) {
try {
Map<String, String> migrationStorageUnitMap = new HashMap<>();
storage.lockStorageUnit();
for (String storageUnitId : migrationMap.keySet()) {
StorageUnitMeta storageUnit = getStorageUnit(storageUnitId);
if (storageUnit.getState() == StorageUnitState.DISCARD) { // 已经迁移完了
continue;
}
if (storageUnit.getMigrationTo() != null) { // 正在迁移中
migrationStorageUnitMap.put(storageUnitId, storageUnit.getMigrationTo());
continue;
}
String newStorageUnitId = storage.addStorageUnit();
StorageUnitMeta clonedStorageUnit = storageUnit.clone();
StorageUnitMeta newStorageUnit =
clonedStorageUnit.migrationStorageUnitMeta(
newStorageUnitId, id, migrationMap.get(storageUnitId));
// 更新新的 storage unit
cache.updateStorageUnit(newStorageUnit);
for (StorageUnitHook hook : storageUnitHooks) {
hook.onChange(null, newStorageUnit);
}
storage.updateStorageUnit(newStorageUnit);
// 更新旧的 storage unit
cache.updateStorageUnit(clonedStorageUnit);
for (StorageUnitHook hook : storageUnitHooks) {
hook.onChange(storageUnit, clonedStorageUnit);
}
storage.updateStorageUnit(clonedStorageUnit);
migrationStorageUnitMap.put(storageUnitId, newStorageUnitId);
}
return migrationStorageUnitMap;
} catch (MetaStorageException e) {
logger.error("migration storage unit error: ", e);
} finally {
try {
storage.releaseStorageUnit();
} catch (MetaStorageException e) {
logger.error("release storage unit lock error: ", e);
}
}
return null;
}

private Map<String, String> startMigrationStorageUnitsWithoutData(
Map<String, Long> migrationMap) { // 这里的 key 指的是实际进行迁移的分片,value 指的是实际的目的地
try {
Map<String, String> migrationStorageUnitMap = new HashMap<>();

List<StorageUnitMeta> beforeStorageUnits = new ArrayList<>();
List<StorageUnitMeta> afterStorageUnits = new ArrayList<>();

storage.lockStorageUnit();
for (String storageUnitId : migrationMap.keySet()) {
String newStorageUnitId = storage.addStorageUnit();

StorageUnitMeta storageUnit =
getStorageUnit(storageUnitId); // 宕机的存储单元,直接标记成 Discard
if (storageUnit.getState()
== StorageUnitState.DISCARD) { // 正处在迁移或者迁移的中间状态,表示当前的请求为一个重试的请求
StorageUnitMeta targetUnit = getStorageUnit(storageUnit.getMigrationTo());
if (targetUnit.getState() == StorageUnitState.CREATING) {
// 这里需要选出来一个副本,作为数据迁移的源头
if (storageUnit.isMaster()) {
if (storageUnit.getReplicas() != null
&& storageUnit.getReplicas().size() > 0) {
migrationStorageUnitMap.put(
storageUnit
.getReplicas()
.get(storageUnit.getReplicas().size() - 1)
.getId(),
newStorageUnitId);
}
} else { // 失效的分片是副本的话,只需要找到主本,使用主本作为数据源即可
migrationStorageUnitMap.put(
storageUnit.getMasterId(), newStorageUnitId);
}
}
continue;
}

StorageUnitMeta clonedStorageUnit = storageUnit.clone();
beforeStorageUnits.add(storageUnit);
afterStorageUnits.add(clonedStorageUnit);

StorageUnitMeta newStorageUnit =
clonedStorageUnit.migrationStorageUnitMeta(
newStorageUnitId, id, migrationMap.get(storageUnitId));
clonedStorageUnit.setState(StorageUnitState.DISCARD);

beforeStorageUnits.add(null);
afterStorageUnits.add(newStorageUnit);

// 为宕机的副本找到合适的主本
if (storageUnit.isMaster()) {
if (storageUnit.getReplicas() != null && storageUnit.getReplicas().size() > 0) {
migrationStorageUnitMap.put(
storageUnit.getReplicas().get(0).getId(), newStorageUnitId);
}
} else { // 失效的分片是副本的话,只需要找到主本,使用主本作为数据源即可
migrationStorageUnitMap.put(storageUnit.getMasterId(), newStorageUnitId);
}

// 变更副本关系
if (storageUnit.isMaster()) {
List<StorageUnitMeta> slaveUnits = storageUnit.getReplicas();
for (StorageUnitMeta unit : slaveUnits) {
StorageUnitMeta clone = unit.clone();
clone.setMasterId(newStorageUnitId);
beforeStorageUnits.add(unit);
afterStorageUnits.add(clone);
newStorageUnit.addReplica(clone);
}
} else {
StorageUnitMeta masterUnit = getStorageUnit(storageUnit.getMasterId());
masterUnit.addReplica(newStorageUnit);
masterUnit.removeReplica(storageUnit);
}
}

for (int i = 0; i < beforeStorageUnits.size(); i++) {
StorageUnitMeta before = beforeStorageUnits.get(i);
StorageUnitMeta after = afterStorageUnits.get(i);
cache.updateStorageUnit(after);
for (StorageUnitHook hook : storageUnitHooks) {
hook.onChange(before, after);
}
storage.updateStorageUnit(after);
}

return migrationStorageUnitMap;
} catch (MetaStorageException e) {
logger.error("migration storage unit error: ", e);
} finally {
try {
storage.releaseStorageUnit();
} catch (MetaStorageException e) {
logger.error("release storage unit lock error: ", e);
}
}
return null;
}

@Override
public boolean finishMigrationStorageUnit(String storageUnitId, boolean migrationData) {
if (migrationData) {
return finishMigrationStorageUnitWithData(storageUnitId);
}
return finishMigrationStorageUnitWithoutData(storageUnitId);
}

public boolean finishMigrationStorageUnitWithData(String storageUnitId) {
logger.info("start migration for {} {}", storageUnitId, true);
try {
List<StorageUnitMeta> beforeStorageUnits = new ArrayList<>();
List<StorageUnitMeta> afterStorageUnits = new ArrayList<>();
storage.lockStorageUnit();
StorageUnitMeta sourceStorageUnit = getStorageUnit(storageUnitId);
StorageUnitMeta clonedSourceStorageUnit = sourceStorageUnit.clone();
clonedSourceStorageUnit.setState(StorageUnitState.DISCARD);
beforeStorageUnits.add(sourceStorageUnit);
afterStorageUnits.add(clonedSourceStorageUnit);

StorageUnitMeta targetStorageUnit = getStorageUnit(sourceStorageUnit.getMigrationTo());
StorageUnitMeta clonedTargetStorageUnit = targetStorageUnit.clone();
clonedTargetStorageUnit.setState(StorageUnitState.NORMAL);
beforeStorageUnits.add(targetStorageUnit);
afterStorageUnits.add(clonedTargetStorageUnit);
if (sourceStorageUnit.isMaster()) {
List<StorageUnitMeta> slaveUnits = sourceStorageUnit.getReplicas();
for (StorageUnitMeta unit : slaveUnits) {
StorageUnitMeta clone = unit.clone();
clone.setMasterId(clonedTargetStorageUnit.getId());
beforeStorageUnits.add(unit);
afterStorageUnits.add(clone);
clonedTargetStorageUnit.addReplica(clone);
}
} else {
StorageUnitMeta masterStorageUnit =
getStorageUnit(clonedSourceStorageUnit.getMasterId());
masterStorageUnit.addReplica(clonedTargetStorageUnit);
masterStorageUnit.removeReplica(clonedSourceStorageUnit);
}

for (int i = 0; i < beforeStorageUnits.size(); i++) {
StorageUnitMeta before = beforeStorageUnits.get(i);
StorageUnitMeta after = afterStorageUnits.get(i);
cache.updateStorageUnit(after);
for (StorageUnitHook hook : storageUnitHooks) {
hook.onChange(before, after);
}
storage.updateStorageUnit(after);
}

logger.info("finish migration for {} {}", storageUnitId, true);
return true;
} catch (MetaStorageException e) {
logger.error("migration storage unit error: ", e);
} finally {
try {
storage.releaseStorageUnit();
} catch (MetaStorageException e) {
logger.error("release storage unit lock error: ", e);
}
}
return false;
}

public boolean finishMigrationStorageUnitWithoutData(String storageUnitId) {
logger.info("call finish migration for {} {}", storageUnitId, false);
try {
storage.lockStorageUnit();
StorageUnitMeta sourceStorageUnit = getStorageUnit(storageUnitId);
String targetStorageUnitId = sourceStorageUnit.getMigrationTo();

StorageUnitMeta storageUnit = getStorageUnit(targetStorageUnitId);
StorageUnitMeta clonedStorageUnit = storageUnit.clone();
if (storageUnit.getState() == StorageUnitState.NORMAL) {
return true;
}
clonedStorageUnit.setState(StorageUnitState.NORMAL);
cache.updateStorageUnit(clonedStorageUnit);
for (StorageUnitHook hook : storageUnitHooks) {
hook.onChange(storageUnit, clonedStorageUnit);
}
storage.updateStorageUnit(clonedStorageUnit);
logger.info("finish migration for {} {}", storageUnitId, false);
return true;
} catch (MetaStorageException e) {
logger.error("migration storage unit error: ", e);
} finally {
try {
storage.releaseStorageUnit();
} catch (MetaStorageException e) {
logger.error("release storage unit lock error: ", e);
}
}
return false;
}

@Override
public boolean updateStorageUnit(StorageUnitMeta storageUnit) {
return false;
}

@Override
public boolean updateStorageEngine(long storageID, StorageEngineMeta storageEngineMeta) {
if (getStorageEngine(storageID) == null) {
Expand Down
Loading