braft 提供了一系列 API 用来控制复制主或者具体节点,可以选择在程序里面调用 API 或者使用 braft_cli 来给节点发远程控制命令,通过 cli 发送命令的方式更改配置实际上最终也是调用的这些接口,配置变更分为以下三类。
在分布式系统中,机器故障、扩容、副本均衡是管理平面需要解决的基本问题,braft 提供了几种方式:
- 增加一个节点
- 删除一个节点
- 全量替换现有节点列表
class Node {
public:
// Add a new peer to the raft group. done->Run() would be invoked after this
// operation finishes, describing the detailed result.
void add_peer(const PeerId& peer, Closure* done);
// Remove the peer from the raft group. done->Run() would be invoked after
// this operation finishes, describing the detailed result.
void remove_peer(const PeerId& peer, Closure* done);
// Change the configuration of the raft group to |new_peers| , done->Run()
// would be invoked after this operation finishes, describing the detailed
// result.
void change_peers(const Configuration& new_peers, Closure* done);
}
节点变更分为几个阶段:
- 追赶阶段:如果新的节点配置相对于当前有新增的一个或者多个节点,Leader 对应的 Replicator 先把最新的 Snapshot 在这些节点中安装,然后开始同步之后的日志。等到所有的新节点数据都追的差不多,就开始进入下一阶段。
- 追赶是为了避免新加入的节点数据和集群相差过远而影响集群的可用性,并不会影响数据安全性。
- 在追赶阶段完成前, **只有 **Leader 知道这些新节点的存在,这个节点都不会被记入到集群的决策集合中,包括选主和日志提交的判定。追赶阶段任意节点失败,则这次节点变更就会被标记为失败。
- 联合选举阶段:Leader 会将旧节点配置和新节点配置写入 Log,在这个阶段之后直到下一个阶段之前,所有的选举和日志同步都需要在新老节点之间达到多数。 这里和标准算法有一点不同, 考虑到和之前实现的兼容性,如果这次只变更了一个节点,则直接进入下一阶段。
- 新配置同步阶段:当联合选举日志正式被新旧集群接受之后,Leader 将新节点配置写入 Log,之后所有的 Log 和选举只需要在新集群中达成一致。等待日志提交到新集群中的多数节点中之后,正式完全节点变更。
- 清理阶段:Leader 会将多余的 Replicator(如果有)关闭,特别如果当 Leader 本身已经从节点配置中被移除,这时候 Leader 会执行 stepdown 并且唤醒一个合适的节点触发选举。
当考虑节点删除的时候,情况会变得有些复杂,由于判断成功提交的节点数量变少,可能会出现在前面的日志没有成功提交的情况下,后面的日志已经被判断已经提交。这时候为了状态机的操作有序性,即使之前的日志还未提交,我们也会强制判断为成功。
举个例子:
当前集群为 (A, B, C, D),其中 C D 属于故障,由于多数节点处于故障阶段,存在 10 条还未被提交的日志(A B 已经写入,C D 未写入),这时候发起操作,将 D 从集群中删除,这条日志的成功判定条件变为在 (A, B, C),这时候只需要 A、B 都成功写入这条日志即可认为这个日志已经成功提交,但是之前还存在 10 条未写入日志。这时候我们会强制认为之前的 10 条已经成功提交。
这个 case 比较极端,通常这个情况下 leader 都会 step down,集群会进入无主状态,需要至少修复 CD 中的一个节点之后集群才能正常提供服务。
我们先来看看 NodeImpl::add_peers()
的实现:
void NodeImpl::add_peer(const PeerId& peer, Closure* done) {
BAIDU_SCOPED_LOCK(_mutex);
Configuration new_conf = _conf.conf;
new_conf.add_peer(peer);
return unsafe_register_conf_change(_conf.conf, new_conf, done);
}
NodeImpl::remove_peer()
和 NodeImpl::change_peers()
的代码类似,最后都会调用 NodeImpl::unsafe_register_conf_change()
。该函数会检查当前状态是否是 Leader,以及 _conf_ctx
是否忙碌(也就是是否有另一个配置变更在发生),如果一切正常就会调用 NodeImpl::ConfigurationCtx::start()
开始配置变更。
在分析 start 之前,我们先看看上面提到的节点变更的几个阶段状态转移实现:
- STAGE_CATCHING_UP:追赶阶段,如果有新的节点加入就需要追赶之后再配置变更
- STAGE_JOINT:共同一致状态,新旧配置共同起作用
- STAGE_STABLE:稳定状态,说明新的配置在 majority 上提交了,进入稳定状态让新配置单独起作用
void NodeImpl::ConfigurationCtx::next_stage() {
CHECK(is_busy());
switch (_stage) {
case STAGE_CATCHING_UP:
if (_nchanges > 1) {
_stage = STAGE_JOINT;
Configuration old_conf(_old_peers);
return _node->unsafe_apply_configuration(Configuration(_new_peers), &old_conf, false);
}
// Skip joint consensus since only one peer has been changed here. Make
// it a one-stage change to be compitible with the legacy
// implementation.
case STAGE_JOINT:
_stage = STAGE_STABLE;
return _node->unsafe_apply_configuration(Configuration(_new_peers), NULL, false);
case STAGE_STABLE:
{
bool should_step_down = _new_peers.find(_node->_server_id) == _new_peers.end();
butil::Status st = butil::Status::OK();
reset(&st);
if (should_step_down) {
_node->step_down(_node->_current_term, true, butil::Status(ELEADERREMOVED, "This node was removed"));
}
return;
}
case STAGE_NONE:
CHECK(false) << "Can't reach here";
return;
}
}
下面分步说明 NodeImpl::ConfigurationCtx::start()
函数所做的工作:
-
设置当前状态为
STAGE_CATCHING_UP
,并从新旧配置中取出_old_peers
和_new_peers
,通过比较两者,得出需要增加的配置 adding 和需要删除的配置removingCHECK(!is_busy()); CHECK(!_done); _done = done; _stage = STAGE_CATCHING_UP; old_conf.list_peers(&_old_peers); new_conf.list_peers(&_new_peers); Configuration adding; Configuration removing; new_conf.diffs(old_conf, &adding, &removing); _nchanges = adding.size() + removing.size();
-
如果没有新增节点,就直接进入下一个阶段(STAGE_JOINT):
if (adding.empty()) { ss << ", begin removing."; LOG(INFO) << ss.str(); return next_stage(); }
-
否则需要让新加入的节点追赶日志(catch up)才能进入下个阶段,将新的节点添加到自己的
_replicator_group
里面,然后调用_node->_replicator_group.wait_caughtup()
,等到新节点的日志追赶成功就调用回调进入下一个 stage,是否追赶上的判断标志是新加入节点和 Leader 之间的 log index 的差距小于 catchup_margin,catchup_margin 由NodeOption::catchup_margin
变量指定,默认是 1000。。adding.list_peers(&_adding_peers); for (std::set<PeerId>::const_iterator iter = _adding_peers.begin(); iter != _adding_peers.end(); ++iter) { if (_node->_replicator_group.add_replicator(*iter) != 0) { return on_caughtup(_version, *iter, false); } OnCaughtUp* caught_up = new OnCaughtUp(_node, _node->_current_term, *iter, _version); timespec due_time = butil::milliseconds_from_now(_node->_options.get_catchup_timeout_ms()); if (_node->_replicator_group.wait_caughtup(*iter, _node->_options.catchup_margin, &due_time, caught_up) != 0) { LOG(WARNING) << "node " << _node->node_id() << " wait_caughtup failed, peer " << *iter; delete caught_up; return on_caughtup(_version, *iter, false); } }
ReplicatorGroup::wait_caughtup()
的回调是 OnCaughtUp
,最终会调用 NodeImpl::on_caughtup()
处理,分为三种情况:
- 成功 Caught Up:调用
NodeImpl::ConfigurationCtx::on_caughtup()
进入下一阶段 - 如果超时还没有赶上,并且节点还存活的话就重试。
- 否则失败
void NodeImpl::on_caughtup(const PeerId& peer, int64_t term,
int64_t version, const butil::Status& st) {
BAIDU_SCOPED_LOCK(_mutex);
// ... ...
if (st.ok()) { // Caught up successfully
_conf_ctx.on_caughtup(version, peer, true);
return;
}
// Retry if this peer is still alive
if (st.error_code() == ETIMEDOUT
&& (butil::monotonic_time_ms() - _replicator_group.last_rpc_send_timestamp(peer))
<= _options.election_timeout_ms) {
OnCaughtUp* caught_up = new OnCaughtUp(this, _current_term, peer, version);
timespec due_time = butil::milliseconds_from_now(_options.get_catchup_timeout_ms());
if (0 == _replicator_group.wait_caughtup(peer, _options.catchup_margin, &due_time, caught_up)) {
return;
} else {
delete caught_up;
}
}
_conf_ctx.on_caughtup(version, peer, false);
}
ReplicatorGroup::wait_caughtup()
会调用 Replicator::wait_for_caught_up()
,后者会启动一个定时器并保存 done 回调(即 OnCaughtUp 对象),定时器超时后会调用 Replicator::_on_catch_up_timedout()
,该函数会调用 Replicator::_notify_on_caught_up()
,然后在一个新的 bthread 中运行 done 回调函数检查是否成功 Catch Up:
void Replicator::wait_for_caught_up(ReplicatorId id,
int64_t max_margin,
const timespec* due_time,
CatchupClosure* done) {
// ... ...
if (due_time != NULL) {
done->_has_timer = true;
if (bthread_timer_add(&done->_timer,
*due_time,
_on_catch_up_timedout,
(void*)id) != 0) {
CHECK_EQ(0, bthread_id_unlock(dummy_id));
LOG(ERROR) << "Fail to add timer";
done->status().set_error(EINVAL, "Duplicated call");
run_closure_in_bthread(done);
return;
}
}
r->_catchup_closure = done;
// ... ...
}
Replicator::_on_catch_up_timedout()
和 Replicator::_notify_on_caught_up()
的部分实现:
void Replicator::_on_catch_up_timedout(void* arg) {
bthread_id_t id = { (uint64_t)arg };
Replicator* r = NULL;
if (bthread_id_lock(id, (void**)&r) != 0) {
LOG(WARNING) << "Replicator is destroyed when catch_up_timedout.";
return;
}
r->_notify_on_caught_up(ETIMEDOUT, false);
CHECK_EQ(0, bthread_id_unlock(id))
<< "Fail to unlock" << id;
}
void Replicator::_notify_on_caught_up(int error_code, bool before_destroy) {
// ... ...
Closure* saved_catchup_closure = _catchup_closure;
_catchup_closure = NULL;
return run_closure_in_bthread(saved_catchup_closure);
}
当 Caught Up 定时器超时后会检测是否成功 Caught Up(NodeImpl::on_caughtup()
),如果成功会调用 NodeImpl::ConfigurationCtx::next_stage()
进入下一个阶段,然后调用 NodeImpl::unsafe_apply_configuration()
函数,代码如下:
void NodeImpl::unsafe_apply_configuration(const Configuration& new_conf,
const Configuration* old_conf,
bool leader_start) {
CHECK(_conf_ctx.is_busy());
LogEntry* entry = new LogEntry();
entry->AddRef();
entry->id.term = _current_term;
entry->type = ENTRY_TYPE_CONFIGURATION;
entry->peers = new std::vector<PeerId>;
new_conf.list_peers(entry->peers);
if (old_conf) {
entry->old_peers = new std::vector<PeerId>;
old_conf->list_peers(entry->old_peers);
}
ConfigurationChangeDone* configuration_change_done =
new ConfigurationChangeDone(this, _current_term, leader_start, _leader_lease.lease_epoch());
// Use the new_conf to deal the quorum of this very log
_ballot_box->append_pending_task(new_conf, old_conf, configuration_change_done);
std::vector<LogEntry*> entries;
entries.push_back(entry);
_log_manager->append_entries(&entries,
new LeaderStableClosure(
NodeId(_group_id, _server_id),
1u, _ballot_box));
_log_manager->check_and_set_configuration(&_conf);
}
它会生成一个类型为 ENTRY_TYPE_CONFIGURATION
的 LogEntry,将 entry 的 peers 设置为新配置,old_peers 设置为旧配置。然后把这个任务添加到投票箱里面(注意该任务已经需要新旧配置都达到多数派了),并调用 LogManager::append_entries
把 entry append 到内存并持久化。对于 ENTRY_TYPE_CONFIGURATION
的 LogEntry, LogManager::append_entries()
里面会特殊处理,把这个配置 append 到 _config_manager
里面:
// location: LogManager::append_entries
for (size_t i = 0; i < entries->size(); ++i) {
// Add ref for disk_thread
(*entries)[i]->AddRef();
if ((*entries)[i]->type == ENTRY_TYPE_CONFIGURATION) {
ConfigurationEntry conf_entry(*((*entries)[i]));
_config_manager->add(conf_entry);
}
}
之后会调用 LogManager::check_and_set_configuration()
把 _conf
设置为刚刚放进去的新配置(其中 old_conf 为之前的配置)。在这个时间点之后,产生的任务,在放到投票箱的时候 _conf.stable()
会返回 false
,然后将第二个参数设置为 _conf.old_conf
(参考 NodeImpl::apply(LogEntryAndClosure[])
函数)。因此这个时间点之后产生的任务需要新旧两个配置共同决定是否提交,也就是 JOINT 状态。
然后它会向普通的 entry 一样被复制到 Follower,等成功提交之后会调用 ConfigurationChangeDone 回调(NodeImpl::on_configuration_change_done()
),进入下一个 stage。
这个时候就可以安全的提交新的配置了,仍然调用 NodeImpl::unsafe_apply_configuration
函数生成新的配置 entry,不同的是 old_conf 设置为 NULL,调用 LogManager::append_entries()
,把当前配置改为新配置,新配置立即生效。
case STAGE_JOINT:
_stage = STAGE_STABLE;
return _node->unsafe_apply_configuration(
Configuration(_new_peers), NULL, false);
当包含新配置的 entry 被成功提交后继续调用 next_stage,这时候 Leader 检查自己是否在新配置里面,如果不在,就退步成 Follower。
配置变更作为一个 AppendEntries RPC 发给 Follower,Follower 也是在 NodeImpl::handle_append_entries_request()
中处理该 entry。如果 entry.peers_size() > 0
说明是一个配置变更的 RPC,之后和在 Leader 上生成 LogEntry 的逻辑差不多,把新旧配置加到 LogEntry 中。
for (int i = 0; i < request->entries_size(); i++) {
index++;
const EntryMeta& entry = request->entries(i);
if (entry.type() != ENTRY_TYPE_UNKNOWN) {
LogEntry* log_entry = new LogEntry();
log_entry->AddRef();
log_entry->id.term = entry.term();
log_entry->id.index = index;
log_entry->type = (EntryType)entry.type();
if (entry.peers_size() > 0) {
log_entry->peers = new std::vector<PeerId>;
for (int i = 0; i < entry.peers_size(); i++) {
log_entry->peers->push_back(entry.peers(i));
}
CHECK_EQ(log_entry->type, ENTRY_TYPE_CONFIGURATION);
if (entry.old_peers_size() > 0) {
log_entry->old_peers = new std::vector<PeerId>;
for (int i = 0; i < entry.old_peers_size(); i++) {
log_entry->old_peers->push_back(entry.old_peers(i));
}
}
} else {
CHECK_NE(entry.type(), ENTRY_TYPE_CONFIGURATION);
}
if (entry.has_data_len()) {
int len = entry.data_len();
data_buf.cutn(&log_entry->data, len);
}
entries.push_back(log_entry);
}
}
然后调用 LogManager::append_entries()
和 LogManager::check_and_set_configuration()
变更配置。
FollowerStableClosure* c = new FollowerStableClosure(
cntl, request, response, done_guard.release(),
this, _current_term);
_log_manager->append_entries(&entries, c);
// update configuration after _log_manager updated its memory status
_log_manager->check_and_set_configuration(&_conf);
当多数节点故障的时候,是不能通过 add_peer/remove_peer/change_peers 进行节点变更的,这个时候安全的做法是等待多数节点恢复,能够保证数据安全。如果业务追求服务的可用性,放弃数据安全性的话,可以使用 reset_peers 飞线设置复制组 Configuration。
class Node {
public:
// Reset the configuration of this node individually, without any repliation
// to other peers before this node beomes the leader. This function is
// supposed to be inovoked when the majority of the replication group are
// dead and you'd like to revive the service in the consideration of
// availability.
// Notice that neither consistency nor consensus are guaranteed in this
// case, BE CAREFULE when dealing with this method.
butil::Status reset_peers(const Configuration& new_peers);
}
reset_peer 之后,新的 Configuration 的节点会开始重新选主,当新的 leader 选主成功之后,会写一条新 Configuration 的 Log,这条 Log 写成功之后,reset_peer 才算成功。如果中间又发生了失败的话,外部需要重新选取 peers 并发起 reset_peers。
不建议使用 reset_peers,reset_peers 会破坏 raft 对数据一致性的保证,而且可能会造成脑裂。例如,{A B C D E} 组成的复制组 G,其中 {C D E} 故障,将 {A B} set_peer 成功恢复复制组 G',{C D E} 又重新启动它们也会形成一个复制组 G'',这样复制组 G 中会存在两个 Leader,且 {A B} 这两个复制组中都存在,其中的follower 会接收两个 leader 的 AppendEntries,当前只检测 term 和 index,可能会导致其上数据错乱。
butil::Status NodeImpl::reset_peers(const Configuration& new_peers) {
// ... ...
Configuration new_conf(new_peers);
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " set_peer from "
<< _conf.conf << " to " << new_conf;
// change conf and step_down
_conf.conf = new_conf;
_conf.old_conf.reset();
butil::Status status;
status.set_error(ESETPEER, "Raft node set peer normally");
step_down(_current_term + 1, false, status);
return butil::Status::OK();
}
class Node {
public:
// Try transferring leadership to |peer|.
// If peer is ANY_PEER, a proper follower will be chosen as the leader for
// the next term.
// Returns 0 on success, -1 otherwise.
int transfer_leadership_to(const PeerId& peer);
}
在一些场景中,我们会需要外部强制将 Leader 切换到另外的节点, 比如:
- 主节点要重启,这时候发起一次主迁移能够减少集群的不可服务时间
- 主节点所在的机器过于繁忙,我们需要迁移到另外一个相对空闲的机器中
- 复制组跨 IDC 部署,我们希望主节点存在于离 Client 延时最小的集群中
braft 实现了主迁移算法,这个算法包含如下步骤:
- 主停止写入,这时候所有的 apply 会报错
- 继续向所有的 Follower 同步日志,当发现目标节点的日志已经和主一样多之后,向对应节点发起一个 TimeoutNow RPC
- 节点收到 TimeoutNowRequest 之后,直接变为 Candidate,增加 term,并开始进入选主
- 主收到 TimeoutNowResponse 之后,开始 step down
- 如果在 election_timeout_ms 时间内主没有 step down,会取消主迁移操作,开始重新接受写入请求
该函数首先判断是否指定了 peer_id(传递的参数),如果没有,就从 replicator_group
中找出 next_index
最大的 Follower 作为目标。
PeerId peer_id = peer;
// if peer_id is ANY_PEER(0.0.0.0:0:0), the peer with the largest
// last_log_id will be selected.
if (peer_id == ANY_PEER) {
LOG(INFO) << "node " << _group_id << ":" << _server_id
<< " starts to transfer leadership to any peer.";
// find the next candidate which is the most possible to become new leader
if (_replicator_group.find_the_next_candidate(&peer_id, _conf) != 0) {
return -1;
}
}
如果一切正常,获取当前的 last_log_index
然后调用 ReplicatorGroup::transfer_leadership_to()
向 Follower 发起 TimeoutNowRequest 请求:
int NodeImpl::transfer_leadership_to(const PeerId& peer) {
// ... ...
const int64_t last_log_index = _log_manager->last_log_index();
const int rc = _replicator_group.transfer_leadership_to(peer_id, last_log_index);
// ... ...
}
int ReplicatorGroup::transfer_leadership_to(const PeerId& peer, int64_t log_index) {
// ... ...
return Replicator::transfer_leadership(rid, log_index);
}
int Replicator::transfer_leadership(ReplicatorId id, int64_t log_index) {
Replicator* r = NULL;
bthread_id_t dummy = { id };
const int rc = bthread_id_lock(dummy, (void**)&r);
if (rc != 0) {
return rc;
}
// dummy is unlock in _transfer_leadership
return r->_transfer_leadership(log_index);
}
int Replicator::_transfer_leadership(int64_t log_index) {
if (_has_succeeded && _min_flying_index() > log_index) {
// _id is unlock in _send_timeout_now
_send_timeout_now(true, false);
return 0;
}
// Register log_index so that _on_rpc_returned trigger
// _send_timeout_now if _min_flying_index reaches log_index
_timeout_now_index = log_index;
CHECK_EQ(0, bthread_id_unlock(_id)) << "Fail to unlock " << _id;
return 0;
}
可以向 Follower 发送 TimeoutNowRequest 的条件是该 Follower 的日志已经和当前 Leader 的日志一样多,如果现在可以发送 TimeoutNowRequest,就立刻发送;否则把 last_log_index 记录到 Replicator::_timeout_now_index
中,然后在每次日志复制返回的时候会判断是否可以发送 TimeoutNowRequest,如果可以就发送。
void Replicator::_on_rpc_returned(ReplicatorId id, brpc::Controller* cntl,
AppendEntriesRequest* request,
AppendEntriesResponse* response,
int64_t rpc_send_time) {
// ... ...
// dummy_id is unlock in _send_entries
if (r->_timeout_now_index > 0 && r->_timeout_now_index < r->_min_flying_index()) {
r->_send_timeout_now(false, false);
}
}
Leader 成功发送 TimeoutNowRequest 请求后,将 _state
设置为 STATE_TRANSFERRING
,调用状态机的 on_leader_stop()
回调函数(用户可以自定义),然后开启 _transfer_timer
定时器。
_state = STATE_TRANSFERRING;
butil::Status status;
status.set_error(ETRANSFERLEADERSHIP, "Raft leader is transferring "
"leadership to %s", peer_id.to_string().c_str());
_leader_lease.on_leader_stop();
_fsm_caller->on_leader_stop(status);
LOG(INFO) << "node " << _group_id << ":" << _server_id
<< " starts to transfer leadership to " << peer_id;
_stop_transfer_arg = new StopTransferArg(this, _current_term, peer_id);
if (bthread_timer_add(&_transfer_timer,
butil::milliseconds_from_now(_options.election_timeout_ms),
on_transfer_timeout, _stop_transfer_arg) != 0) {
lck.unlock();
LOG(ERROR) << "Fail to add timer";
on_transfer_timeout(_stop_transfer_arg);
return -1;
}
如果超时还没有 transfer 成功,就调用 NodeImpl::handle_transfer_timeout()
停止 transfer,并将 _state
设置回 STATE_LEADER
,_timeout_now_index 设置成 0:
void on_transfer_timeout(void* arg) {
StopTransferArg* a = (StopTransferArg*)arg;
a->node->handle_transfer_timeout(a->term, a->peer);
delete a;
}
void NodeImpl::handle_transfer_timeout(int64_t term, const PeerId& peer) {
LOG(INFO) << "node " << node_id() << " failed to transfer leadership to peer="
<< peer << " : reached timeout";
BAIDU_SCOPED_LOCK(_mutex);
if (term == _current_term) {
_replicator_group.stop_transfer_leadership(peer);
if (_state == STATE_TRANSFERRING) {
_leader_lease.on_leader_start(term);
_fsm_caller->on_leader_start(term, _leader_lease.lease_epoch());
_state = STATE_LEADER;
_stop_transfer_arg = NULL;
}
}
}
Follower 收到 TimeoutNowRequest 请求后调用 NodeImpl::handle_timeout_now_request
,在 response 里面将 term 设置为 _current_term + 1
,然后调用 elect_self
发起选举。
if (FLAGS_raft_enable_leader_lease) {
// We will disrupt the leader, don't let the old leader
// step down.
response->set_term(_current_term);
} else {
// Increase term to make leader step down
response->set_term(_current_term + 1);
}
response->set_success(true);
// Parallelize Response and election
run_closure_in_bthread(done_guard.release());
elect_self(&lck, request->old_leader_stepped_down());
// Don't touch any mutable field after this point, it's likely out of the
// critical section
if (lck.owns_lock()) {
lck.unlock();
}
Replicator::_on_timeout_now_returned()
会调用 NodeImpl::increase_term_to()
(它会调用 step down),将当前节点回退成 Follower,step down 里面会停止 _transfer_timer
定时器。
void Replicator::_on_timeout_now_returned(
ReplicatorId id, brpc::Controller* cntl,
TimeoutNowRequest* request,
TimeoutNowResponse* response,
bool old_leader_stepped_down) {
std::unique_ptr<brpc::Controller> cntl_guard(cntl);
std::unique_ptr<TimeoutNowRequest> req_guard(request);
std::unique_ptr<TimeoutNowResponse> res_guard(response);
Replicator *r = NULL;
bthread_id_t dummy_id = { id };
if (bthread_id_lock(dummy_id, (void**)&r) != 0) {
return;
}
std::stringstream ss;
ss << "node " << r->_options.group_id << ":" << r->_options.server_id
<< " received TimeoutNowResponse from "
<< r->_options.peer_id;
if (cntl->Failed()) {
ss << " fail : " << cntl->ErrorText();
BRAFT_VLOG << ss.str();
if (old_leader_stepped_down) {
r->_notify_on_caught_up(ESTOP, true);
r->_destroy();
} else {
CHECK_EQ(0, bthread_id_unlock(dummy_id));
}
return;
}
ss << (response->success() ? " success " : "fail:");
BRAFT_VLOG << ss.str();
if (response->term() > r->_options.term) {
NodeImpl *node_impl = r->_options.node;
// Acquire a reference of Node here in case that Node is detroyed
// after _notify_on_caught_up.
node_impl->AddRef();
r->_notify_on_caught_up(EPERM, true);
butil::Status status;
status.set_error(EHIGHERTERMRESPONSE, "Leader receives higher term "
"timeout_now_response from peer:%s", r->_options.peer_id.to_string().c_str());
r->_destroy();
node_impl->increase_term_to(response->term(), status);
node_impl->Release();
return;
}
if (old_leader_stepped_down) {
r->_notify_on_caught_up(ESTOP, true);
r->_destroy();
} else {
CHECK_EQ(0, bthread_id_unlock(dummy_id));
}
}