在节点赢得选举后,就会调用 NodeImpl::become_leader()
进行接下来的工作。
become_leader 主要做了如下工作:
- 停止 VoteTimer
- 把
_state
设置为 Leader,_leader_id
设置为自己。 - 把
_replicator_group
的 term 设置为当前 term,并把其他 peer 添加到_replicator_group
里面。添加的时候会给每个 peer 分配一个 Replicator,并调用Replicator::start()
。
void NodeImpl::become_leader() {
CHECK(_state == STATE_CANDIDATE);
// cancel candidate vote timer
_vote_timer.stop();
_vote_ctx.reset(this);
_state = STATE_LEADER;
_leader_id = _server_id;
_replicator_group.reset_term(_current_term);
_follower_lease.reset();
_leader_lease.on_leader_start(_current_term);
std::set<PeerId> peers;
_conf.list_peers(&peers);
for (std::set<PeerId>::const_iterator
iter = peers.begin(); iter != peers.end(); ++iter) {
if (*iter == _server_id) {
continue;
}
//TODO: check return code
_replicator_group.add_replicator(*iter);
}
// init commit manager
_ballot_box->reset_pending_index(_log_manager->last_log_index() + 1);
// Register _conf_ctx to reject configuration changing before the first log
// is committed.
CHECK(!_conf_ctx.is_busy());
_conf_ctx.flush(_conf.conf, _conf.old_conf);
_stepdown_timer.start();
}
每个 Follower 对应一个 Replicator,负责与该 Follower 的日志复制、Snapshot Install 等工作。
-
==初始化 _next_index 为最新的 log index + 1==(后面会通过第一个空的 AppendEntries RPC 来寻找正确的 next index)
r->_next_index = r->_options.log_manager->last_log_index() + 1;
-
启动 heartbeat_timer
-
发送空的 AppendEntries RPC 通知 Follower 自己的 Leader 的身份和进行 Log Index 的寻找,其他节点收到第一个空的 entry 就会回退成 Follower 并把自己的 leader_id 设置成 request 里面包含的 server_id。
if (bthread_id_create(&r->_id, r, _on_error) != 0) {
LOG(ERROR) << "Fail to create bthread_id"
<< ", group " << options.group_id;
delete r;
return -1;
}
r->_update_last_rpc_send_timestamp(butil::monotonic_time_ms());
r->_start_heartbeat_timer(butil::gettimeofday_us());
// Note: r->_id is unlock in _send_empty_entries, don't touch r ever after
r->_send_empty_entries(false); // is_heartbeats = false
Tips:
Replicator::_next_inde
表示下一个要发给该 Follower 的 Log index,初始值为当前最新的 log index + 1,然后在第一次 append entries 的时候通过 Follower 的 response 进行调整,找到 Follower 实际需要的下一个 log index。- empty AppendEntries request 有两个作用:
- 向 Follower 表明自己 Leader 的身份,Follower 会更新自己的 Leader_id 信息
- 寻找真正的 _next_index
Leader 会为每个 Follower 维护下一个将要发送的 log index,即 Replicator::_next_index
,在一个节点成为 Leader 时,它并不知道每个 Follower 的日志复制情况,所以初始化为了自己本地最新的 log index + 1,然后通过发送一个空的 AppendEntries 来寻找 Follower 真正需要发送的 next_index。
发送心跳也会使用 Replicator::_send_empty_entries(bool is_heartbeat)
函数,区别是这里的参数 is_heartbeat
为 false
。忽略 heartbeat 相关的代码,简化后的代码如下:
void Replicator::_send_empty_entries(bool is_heartbeat) {
// ... ...
// _next_index 表示下一个要发给 replicator 的 log index,初始值是本地的 last_log_idnex + 1
if (_fill_common_fields(request.get(), _next_index - 1, is_heartbeat) != 0) {
CHECK(!is_heartbeat);
// _id is unlock in _install_snapshot
return _install_snapshot();
}
if (is_heartbeat) {
// ... ...
} else {
_st.st = APPENDING_ENTRIES;
_st.first_log_index = _next_index;
_st.last_log_index = _next_index - 1;
CHECK(_append_entries_in_fly.empty());
CHECK_EQ(_flying_append_entries_size, 0);
_append_entries_in_fly.push_back(FlyingAppendEntriesRpc(_next_index, 0, cntl->call_id()));
_append_entries_counter++;
}
google::protobuf::Closure* done = brpc::NewCallback(
is_heartbeat ? _on_heartbeat_returned : _on_rpc_returned, // 在当前场景下回调函数是 _on_rpc_returned
_id.value, cntl.get(), request.get(), response.get(),
butil::monotonic_time_ms());
RaftService_Stub stub(&_sending_channel);
stub.append_entries(cntl.release(), request.release(), response.release(), done);
CHECK_EQ(0, bthread_id_unlock(_id)) << "Fail to unlock " << _id;
}
Replicator::_fill_common_fields()
用于填充 AppendEntriesRequest,第一次的 _fill_common_fields 一定是会成功的,我们先忽略失败的情况。- 在
Replicator::_append_entries_in_fly
中记录下当前已发送的 rpc 的信息,==注意这里 size 是 0==;并递增Replicator::_append_entries_counter
,这两个值在后面收到 response 后会用到。 - 发送 RPC,其回调是
Replicator::_on_rpc_returned()
AppendEntriesRequest 定义:
message AppendEntriesRequest {
required string group_id = 1;
required string server_id = 2;
required string peer_id = 3;
required int64 term = 4;
required int64 prev_log_term = 5;
required int64 prev_log_index = 6;
repeated EntryMeta entries = 7;
required int64 committed_index = 8;
};
NodeImpl::handle_append_entries_request()
函数处理所有的 AppendEntries RPC,包括空 AppendEntries、Heartbeat 和 日志复制,在这里我们只关心空 AppendEntries 相关的逻辑(实际上 Heartbeat 与空 AppendEntries 无法区分,entries_size 都是 0)。
-
各种 check
-
如果 request 中的 term 小于自己的 term,则将 response 的 success 设置为 false,term 设置为自己的 term 并返回
-
如果 request 中的 term 大于自己的 term,则 step down
-
-
如果当前节点的
leader_id
为空,保存server_id
为leader_id
-
更新
_last_leader_timestamp
(pre_vote 和 投票请求的时候会根据这个时间判断是否能投票)if (!from_append_entries_cache) { // Requests from cache already updated timestamp _follower_lease.renew(_leader_id); }
-
根据 request 中的 prev_log_index 从本地获取该 index 对应的 term(local_prev_log_term)。在下面 3 中情况下,返回的 term 为 0:
- prev_log_index 为 0;
- prev_log_index 比本地最新的 log index 大
- prev_log_index 的日志已经被删除了(发生了 snapshot 删除了日志)
-
如果 local_prev_log_term 和 request 中的 term 不匹配,==就设置 success 为 false==,==last_log_index 为本地最新的日志 index==,返回。分为两种情况:
- local_prev_log_term 为 0:表示本地日志没有 prev_log_index
- 日志已经因为 snapshot 删除,说明本地日志比 prev_log_index 还要长:这种情况不可能发生,因为赢得 Leader 选举一定是日志最长的节点
- ==日志复制落后了,还没有收到过 prev_log_index 的日志==
- ==local_prev_log_term 不为 0:说明出现了日志冲突,这部分日志应该被 Leader 的日志覆盖==
const int64_t prev_log_index = request->prev_log_index(); const int64_t prev_log_term = request->prev_log_term(); const int64_t local_prev_log_term = _log_manager->get_term(prev_log_index); if (local_prev_log_term != prev_log_term) { int64_t last_index = _log_manager->last_log_index(); int64_t saved_term = request->term(); int saved_entries_size = request->entries_size(); std::string rpc_server_id = request->server_id(); // 对于空 AppendEntries, handle_out_of_order_append_entries 会直接返回 false if (!from_append_entries_cache && handle_out_of_order_append_entries(cntl, request, response, done, last_index)) { // ... ... return; } response->set_success(false); response->set_term(_current_term); response->set_last_log_index(last_index); lck.unlock(); if (local_prev_log_term != 0) { // LOG } return; }
- local_prev_log_term 为 0:表示本地日志没有 prev_log_index
-
==设置 success 为 true==,==last_log_index 为本地最新的日志 index==,返回
if (request->entries_size() == 0) { response->set_success(true); response->set_term(_current_term); response->set_last_log_index(_log_manager->last_log_index()); response->set_readonly(_node_readonly); lck.unlock(); // see the comments at FollowerStableClosure::run() _ballot_box->set_last_committed_index( std::min(request->committed_index(), prev_log_index)); return; }
AppendEntriesResponse 定义:
message AppendEntriesResponse {
required int64 term = 1;
required bool success = 2;
optional int64 last_log_index = 3;
optional bool readonly = 4;
};
该函数用于空 AppendEntries response 处理和日志复制 response 处理,我们只关心空 AppendEntries 相关的部分。
-
首先从 ReplicatorId 获取对应的 Replicator(在 bthread 中运行的,可能发生了切换)
Replicator *r = NULL; bthread_id_t dummy_id = { id }; const long start_time_us = butil::gettimeofday_us(); if (bthread_id_lock(dummy_id, (void**)&r) != 0) { return; }
-
如果 RPC 立马返回失败,说明 Follower crash 了,需要将 Replicator 阻塞一段时间:
if (cntl->Failed()) { ss << " fail, sleep."; BRAFT_VLOG << ss.str(); // If the follower crashes, any RPC to the follower fails immediately, // so we need to block the follower for a while instead of looping until // it comes back or be removed // dummy_id is unlock in block r->_reset_next_index(); return r->_block(start_time_us, cntl->ErrorCode()); }
-
如果
response->success()
为false
- 如果
response->term()
大于当前 term 的话,让当前节点将 term 提升并退步成 Follower。 - 否则 Follower 的日志跟 prev_log_index 和 prev_log_term 不匹配(参考上一节中分析的日志不匹配的两种情况)
- ==如果 Follower 的日志比 Leader 的日志要少,直接更新 next_index 为
response->last_log_index() + 1
== - ==否则说明 Follower 的日志和 Leader 有冲突,需要截断,递减 next_index==。
- ==如果 Follower 的日志比 Leader 的日志要少,直接更新 next_index 为
- ==最后再发起一次空 AppendEntries RPC,使用新的 next_index 寻找匹配的位置==
if (!response->success()) { if (response->term() > r->_options.term) { r->_reset_next_index(); // 空 AppendEntries 不会更改 next_index, 这里实际没有作用 NodeImpl *node_impl = r->_options.node; // Acquire a reference of Node here in case that Node is destroyed // after _notify_on_caught_up. node_impl->AddRef(); r->_notify_on_caught_up(EPERM, true); butil::Status status; status.set_error(EHIGHERTERMRESPONSE, "Leader receives higher term " "%s from peer:%s", response->GetTypeName().c_str(), r->_options.peer_id.to_string().c_str()); r->_destroy(); node_impl->increase_term_to(response->term(), status); // 该函数会调用 step_down node_impl->Release(); return; } r->_update_last_rpc_send_timestamp(rpc_send_time); // prev_log_index and prev_log_term doesn't match r->_reset_next_index(); if (response->last_log_index() + 1 < r->_next_index) { // The peer contains less logs than leader r->_next_index = response->last_log_index() + 1; } else { // The peer contains logs from old term which should be truncated, // decrease _last_log_at_peer by one to test the right index to keep if (BAIDU_LIKELY(r->_next_index > 1)) { --r->_next_index; } else { } } // dummy_id is unlock in _send_heartbeat r->_send_empty_entries(false); return; }
- 如果
-
接下来的情况说明
response->success()
为true
,日志匹配,调用Replicator::_send_entries()
开始日志复制
上一节说到,在 response::success 为 false 的时候,说明 Follower 的日志和 Leader 不匹配,重新设置 next_index 后再次发起一个空的 AppendEntries。我们再来看一下 Replicator::_send_empty_entries()
的逻辑,在 _fill_common_fields()
失败后会调用 _install_snapshot()
告诉 Follower 需要 Install Snapshot,那么什么情况下会失败呢?
void Replicator::_send_empty_entries(bool is_heartbeat) {
// ... ...
// _next_index 表示下一个要发给 replicator 的 log index,初始值是本地的 last_log_idnex + 1
if (_fill_common_fields(request.get(), _next_index - 1, is_heartbeat) != 0) {
CHECK(!is_heartbeat);
// _id is unlock in _install_snapshot
return _install_snapshot();
}
// ... ...
}
我们现在来看看 Replicator::_fill_common_fields()
的逻辑,在上面我们已经知道了,在 prev_log_index 不为 0 的情况下,get_term()
返回 0 只有两种情况:
- prev_log_index 比本地最新的 log index 大:这种情况不可能,因为 next_index 是递减的或者直接设置为 Follower 返回的 last_log_index,这两个值肯定都比 next_index 的初始值(本地最新 log index + 1)小。
- prev_log_index 的日志已经被删除了(发生了 snapshot 删除了日志)
那么 _fill_common_fields()
返回 -1 的唯一原因就是 prev_log_index 已经被删除了,所以需要 Follower 先 Install Snapshot。
int Replicator::_fill_common_fields(AppendEntriesRequest* request,
int64_t prev_log_index,
bool is_heartbeat) {
const int64_t prev_log_term = _options.log_manager->get_term(prev_log_index);
if (prev_log_term == 0 && prev_log_index != 0) {
if (!is_heartbeat) {
CHECK_LT(prev_log_index, _options.log_manager->first_log_index());
return -1;
} else {
// ... ...
}
}
request->set_term(_options.term);
request->set_group_id(_options.group_id);
request->set_server_id(_options.server_id.to_string());
request->set_peer_id(_options.peer_id.to_string());
request->set_prev_log_index(prev_log_index);
request->set_prev_log_term(prev_log_term);
request->set_committed_index(_options.ballot_box->last_committed_index());
return 0;
}
在 Replicator::start()
里面启动了 heartbeat_timer,它是个 bthread_timer,在超时的时候会调用 Replicator::_on_timedout()
,该函数会把对应的 thread_id 的状态设置为 ETIMEDOUT
:
void Replicator::_on_timedout(void* arg) {
bthread_id_t id = { (uint64_t)arg };
bthread_id_error(id, ETIMEDOUT);
}
void Replicator::_start_heartbeat_timer(long start_time_us) {
const timespec due_time = butil::milliseconds_from(
butil::microseconds_to_timespec(start_time_us),
*_options.dynamic_heartbeat_timeout_ms);
// 这里的 _id 即在 Replicator::start() 中使用 bthread_id_create 创建的 bthread_it,其关联函数为 Replicator::_on_error()
if (bthread_timer_add(&_heartbeat_timer, due_time, _on_timedout, (void*)_id.value) != 0) {
_on_timedout((void*)_id.value);
}
}
bthread_id_error()
会去调用 Replicator::_on_error()
,然后开始 Replicator::_send_heartbeat()
:
int Replicator::_on_error(bthread_id_t id, void* arg, int error_code) {
Replicator* r = (Replicator*)arg;
if (error_code == ESTOP) {
// ... ...
} else if (error_code == ETIMEDOUT) {
// This error is issued in the TimerThread, start a new bthread to avoid
// blocking the caller.
// Unlock id to remove the context-switch out of the critical section
CHECK_EQ(0, bthread_id_unlock(id)) << "Fail to unlock" << id;
bthread_t tid;
if (bthread_start_urgent(&tid, NULL, _send_heartbeat,
reinterpret_cast<void*>(id.value)) != 0) {
PLOG(ERROR) << "Fail to start bthread";
_send_heartbeat(reinterpret_cast<void*>(id.value));
}
return 0;
}
// ... ...
}
关于 bthread_id_create
// ---------------------------------------------------------------------- // Functions to create 64-bit identifiers that can be attached with data // and locked without ABA issues. All functions can be called from // multiple threads simultaneously. Notice that bthread_id_t is designed // for managing a series of non-heavily-contended actions on an object. // It's slower than mutex and not proper for general synchronizations. // ---------------------------------------------------------------------- // Create a bthread_id_t and put it into *id. Crash when `id' is NULL. // id->value will never be zero. // `on_error' will be called after bthread_id_error() is called. // ------------------------------------------------------------------------- // ! User must call bthread_id_unlock() or bthread_id_unlock_and_destroy() // ! inside on_error. // ------------------------------------------------------------------------- // Returns 0 on success, error code otherwise. int bthread_id_create( bthread_id_t* id, void* data, int (*on_error)(bthread_id_t id, void* data, int error_code));
Replicator::_send_heartbeat()
会取调用 Replicator::_send_empty_entries()
发送心跳,因此心跳也是一种特殊的 AppendEntries request。
void Replicator::_send_empty_entries(bool is_heartbeat) {
// ... ...
if (_fill_common_fields(request.get(), _next_index - 1, is_heartbeat) != 0) {
CHECK(!is_heartbeat);
// _id is unlock in _install_snapshot
return _install_snapshot();
}
if (is_heartbeat) {
_heartbeat_in_fly = cntl->call_id();
_heartbeat_counter++;
// set RPC timeout for heartbeat, how long should timeout be is waiting to be optimized.
cntl->set_timeout_ms(*_options.election_timeout_ms / 2);
} else {
// ... ...
}
google::protobuf::Closure* done = brpc::NewCallback(
is_heartbeat ? _on_heartbeat_returned : _on_rpc_returned,
_id.value, cntl.get(), request.get(), response.get(),
butil::monotonic_time_ms());
RaftService_Stub stub(&_sending_channel);
stub.append_entries(cntl.release(), request.release(), response.release(), done);
CHECK_EQ(0, bthread_id_unlock(_id)) << "Fail to unlock " << _id;
}
AppendEntriesRequest 定义:
message AppendEntriesRequest {
required string group_id = 1;
required string server_id = 2;
required string peer_id = 3;
required int64 term = 4;
required int64 prev_log_term = 5;
required int64 prev_log_index = 6;
repeated EntryMeta entries = 7;
required int64 committed_index = 8;
};
NodeImpl::handle_append_entries_request()
函数不仅处理心跳 request(也是个 empty AppendEntries request),也处理日志复制 request 以及前面提到的空 AppendEntries request,在这里我们只关心心跳相关的逻辑(心跳与空 AppendEntries 实际上无法区分)。
-
各种 check
- 如果 request 中的 term 小于自己的 term,则将 response 的 success 设置为 false,term 设置为自己的 term 并返回
- 如果 request 中的 term 大于自己的 term,则 step down
- 如果当前节点的 leader_id 为空,保存 server_id 为 leader_id(这个逻辑应该在第一次发送空的 AppendEntries 中就已经设置了,这里不会为空)
void NodeImpl::check_step_down(const int64_t request_term, const PeerId& server_id) { butil::Status status; if (request_term > _current_term) { status.set_error(ENEWLEADER, "Raft node receives message from new leader with higher term."); step_down(request_term, false, status); } else if (_state != STATE_FOLLOWER) { status.set_error(ENEWLEADER, "Candidate receives message from new leader with the same term."); step_down(request_term, false, status); } else if (_leader_id.is_empty()) { status.set_error(ENEWLEADER, "Follower receives message from new leader with the same term."); step_down(request_term, false, status); } // save current leader if (_leader_id.is_empty()) { reset_leader_id(server_id, status); } }
- 如果收到的 request 不是 Leader 发送的,step down
-
更新
_last_leader_timestamp
(pre_vote 和 投票请求的时候会根据这个时间判断是否能投票)if (!from_append_entries_cache) { // Requests from cache already updated timestamp _follower_lease.renew(_leader_id); }
-
接下来是获取正确的 next_index 的逻辑,该逻辑应该在第一个空的 AppendEntries 中完成,这里我们忽略
-
设置 response 并返回(注意因为无法区分心跳和空 AppendEntries request,这里的逻辑实际上和收到空 AppendEntries request 共用,除了 current_term 外,其他的值我们暂时不用关心)
if (request->entries_size() == 0) { response->set_success(true); response->set_term(_current_term); response->set_last_log_index(_log_manager->last_log_index()); // heartbeat 中未用到 response->set_readonly(_node_readonly); lck.unlock(); // see the comments at FollowerStableClosure::run() _ballot_box->set_last_committed_index(std::min(request->committed_index(), prev_log_index)); // heartbeat 中未用到 return; }
收到心跳回复的处理逻辑很简单,如果 response 里面的 term 大于当前 term,则更新 term 并 step down 到 Follower。否则重启 heartbeat_timer 开始下一轮 heartbeat。
-
Candidate 选举成功成为 Leader 后,首先为每个 Follower 实例化一个 Replicator 对象,用来管理 Follower 的状态;然后启动 Heartbeat 定时器和发送一个空的 AppendEntries RPC 用来表明自己 Leader 的身份和寻找真实的 next_index。
-
next_index 的初始值为本地最新的 log_index + 1
-
next_index 的寻找策略分为两种
- 没有日志冲突,只有日志缺失:直接设置 next_index 为 Follower 返回的最新日志 index
- 有日志冲突,需要递减 next_index 再次尝试
示例如下:
-
==Follower a 与 Leader 数据都是一致的,只是有数据缺失,可以优化为直接通知 Leader 从 logIndex=5 开始进行重传,这样只需一次回溯==。
-
==Follower b 与 Leader有不一致性的数据,需要回溯 7 次才能找到需要进行重传的位置==。