Skip to content

Latest commit

 

History

History
556 lines (452 loc) · 24 KB

2. Next Index 寻找和 HeartBeat.md

File metadata and controls

556 lines (452 loc) · 24 KB

在节点赢得选举后,就会调用 NodeImpl::become_leader() 进行接下来的工作。

NodeImpl::become_leader()

become_leader 主要做了如下工作:

  1. 停止 VoteTimer
  2. _state 设置为 Leader,_leader_id 设置为自己。
  3. _replicator_group 的 term 设置为当前 term,并把其他 peer 添加到 _replicator_group 里面。添加的时候会给每个 peer 分配一个 Replicator,并调用 Replicator::start()
void NodeImpl::become_leader() {
    CHECK(_state == STATE_CANDIDATE);
    // cancel candidate vote timer
    _vote_timer.stop();
    _vote_ctx.reset(this);

    _state = STATE_LEADER;
    _leader_id = _server_id;

    _replicator_group.reset_term(_current_term);
    _follower_lease.reset();
    _leader_lease.on_leader_start(_current_term);

    std::set<PeerId> peers;
    _conf.list_peers(&peers);
    for (std::set<PeerId>::const_iterator
            iter = peers.begin(); iter != peers.end(); ++iter) {
        if (*iter == _server_id) {
            continue;
        }

        //TODO: check return code
        _replicator_group.add_replicator(*iter);
    }

    // init commit manager
    _ballot_box->reset_pending_index(_log_manager->last_log_index() + 1);

    // Register _conf_ctx to reject configuration changing before the first log
    // is committed.
    CHECK(!_conf_ctx.is_busy());
    _conf_ctx.flush(_conf.conf, _conf.old_conf);
    _stepdown_timer.start();
}

Replicator::start()

每个 Follower 对应一个 Replicator,负责与该 Follower 的日志复制、Snapshot Install 等工作。

  1. ==初始化 _next_index 为最新的 log index + 1==(后面会通过第一个空的 AppendEntries RPC 来寻找正确的 next index)

        r->_next_index = r->_options.log_manager->last_log_index() + 1;
  2. 启动 heartbeat_timer

  3. 发送空的 AppendEntries RPC 通知 Follower 自己的 Leader 的身份和进行 Log Index 的寻找,其他节点收到第一个空的 entry 就会回退成 Follower 并把自己的 leader_id 设置成 request 里面包含的 server_id。

    if (bthread_id_create(&r->_id, r, _on_error) != 0) {
        LOG(ERROR) << "Fail to create bthread_id"
                   << ", group " << options.group_id;
        delete r;
        return -1;
    }

    r->_update_last_rpc_send_timestamp(butil::monotonic_time_ms());
    r->_start_heartbeat_timer(butil::gettimeofday_us());
    // Note: r->_id is unlock in _send_empty_entries, don't touch r ever after
    r->_send_empty_entries(false); // is_heartbeats = false

Tips:

  • Replicator::_next_inde 表示下一个要发给该 Follower 的 Log index,初始值为当前最新的 log index + 1,然后在第一次 append entries 的时候通过 Follower 的 response 进行调整,找到 Follower 实际需要的下一个 log index。
  • empty AppendEntries request 有两个作用:
    1. 向 Follower 表明自己 Leader 的身份,Follower 会更新自己的 Leader_id 信息
    2. 寻找真正的 _next_index

寻找 next_index

Leader 会为每个 Follower 维护下一个将要发送的 log index,即 Replicator::_next_index,在一个节点成为 Leader 时,它并不知道每个 Follower 的日志复制情况,所以初始化为了自己本地最新的 log index + 1,然后通过发送一个空的 AppendEntries 来寻找 Follower 真正需要发送的 next_index。

发送空 AppendEntries (Replicator::_send_empty_entries(false))

发送心跳也会使用 Replicator::_send_empty_entries(bool is_heartbeat) 函数,区别是这里的参数 is_heartbeatfalse。忽略 heartbeat 相关的代码,简化后的代码如下:

void Replicator::_send_empty_entries(bool is_heartbeat) {
    // ... ...
    // _next_index 表示下一个要发给 replicator 的 log index,初始值是本地的 last_log_idnex + 1
    if (_fill_common_fields(request.get(), _next_index - 1, is_heartbeat) != 0) {
        CHECK(!is_heartbeat);
        // _id is unlock in _install_snapshot
        return _install_snapshot();
    }
    if (is_heartbeat) {
        // ... ...
    } else {
        _st.st = APPENDING_ENTRIES;
        _st.first_log_index = _next_index;
        _st.last_log_index = _next_index - 1;
        CHECK(_append_entries_in_fly.empty());
        CHECK_EQ(_flying_append_entries_size, 0);
        _append_entries_in_fly.push_back(FlyingAppendEntriesRpc(_next_index, 0, cntl->call_id()));
        _append_entries_counter++;
    }

    google::protobuf::Closure* done = brpc::NewCallback(
                is_heartbeat ? _on_heartbeat_returned : _on_rpc_returned, // 在当前场景下回调函数是 _on_rpc_returned
                _id.value, cntl.get(), request.get(), response.get(),
                butil::monotonic_time_ms());

    RaftService_Stub stub(&_sending_channel);
    stub.append_entries(cntl.release(), request.release(), response.release(), done);
    CHECK_EQ(0, bthread_id_unlock(_id)) << "Fail to unlock " << _id;
}
  1. Replicator::_fill_common_fields() 用于填充 AppendEntriesRequest,第一次的 _fill_common_fields 一定是会成功的,我们先忽略失败的情况。
  2. Replicator::_append_entries_in_fly 中记录下当前已发送的 rpc 的信息,==注意这里 size 是 0==;并递增 Replicator::_append_entries_counter,这两个值在后面收到 response 后会用到。
  3. 发送 RPC,其回调是 Replicator::_on_rpc_returned()

AppendEntriesRequest 定义

message AppendEntriesRequest {
    required string group_id = 1;
    required string server_id = 2;
    required string peer_id = 3;
    required int64 term = 4;
    required int64 prev_log_term = 5;
    required int64 prev_log_index = 6;
    repeated EntryMeta entries = 7;
    required int64 committed_index = 8;
};

收到空 AppendEntries RPC (NodeImpl::handle_append_entries_request())

NodeImpl::handle_append_entries_request() 函数处理所有的 AppendEntries RPC,包括空 AppendEntries、Heartbeat 和 日志复制,在这里我们只关心空 AppendEntries 相关的逻辑(实际上 Heartbeat 与空 AppendEntries 无法区分,entries_size 都是 0)。

  1. 各种 check

    • 如果 request 中的 term 小于自己的 term,则将 response 的 success 设置为 false,term 设置为自己的 term 并返回

    • 如果 request 中的 term 大于自己的 term,则 step down

  2. 如果当前节点的 leader_id 为空,保存 server_idleader_id

  3. 更新 _last_leader_timestamp(pre_vote 和 投票请求的时候会根据这个时间判断是否能投票)

        if (!from_append_entries_cache) {
            // Requests from cache already updated timestamp
            _follower_lease.renew(_leader_id);
        }
  4. 根据 request 中的 prev_log_index 从本地获取该 index 对应的 term(local_prev_log_term)。在下面 3 中情况下,返回的 term 为 0:

    • prev_log_index 为 0;
    • prev_log_index 比本地最新的 log index 大
    • prev_log_index 的日志已经被删除了(发生了 snapshot 删除了日志)
  5. 如果 local_prev_log_term 和 request 中的 term 不匹配,==就设置 success 为 false==,==last_log_index 为本地最新的日志 index==,返回。分为两种情况:

    • local_prev_log_term 为 0:表示本地日志没有 prev_log_index
      • 日志已经因为 snapshot 删除,说明本地日志比 prev_log_index 还要长:这种情况不可能发生,因为赢得 Leader 选举一定是日志最长的节点
      • ==日志复制落后了,还没有收到过 prev_log_index 的日志==
    • ==local_prev_log_term 不为 0:说明出现了日志冲突,这部分日志应该被 Leader 的日志覆盖==
        const int64_t prev_log_index = request->prev_log_index();
        const int64_t prev_log_term = request->prev_log_term();
        const int64_t local_prev_log_term = _log_manager->get_term(prev_log_index);
        if (local_prev_log_term != prev_log_term) {
            int64_t last_index = _log_manager->last_log_index();
            int64_t saved_term = request->term();
            int     saved_entries_size = request->entries_size();
            std::string rpc_server_id = request->server_id();
            // 对于空 AppendEntries, handle_out_of_order_append_entries 会直接返回 false
            if (!from_append_entries_cache &&
                handle_out_of_order_append_entries(cntl, request, response, done, last_index)) {
                // ... ...
                return;
            }
    
            response->set_success(false);
            response->set_term(_current_term);
            response->set_last_log_index(last_index);
            lck.unlock();
            if (local_prev_log_term != 0) {
                // LOG
            }
            return;
        }
  6. ==设置 success 为 true==,==last_log_index 为本地最新的日志 index==,返回

        if (request->entries_size() == 0) {
            response->set_success(true);
            response->set_term(_current_term);
            response->set_last_log_index(_log_manager->last_log_index());
            response->set_readonly(_node_readonly);
            lck.unlock();
            // see the comments at FollowerStableClosure::run()
            _ballot_box->set_last_committed_index(
                    std::min(request->committed_index(),
                             prev_log_index));
            return;
        }

AppendEntriesResponse 定义

message AppendEntriesResponse {
    required int64 term = 1;
    required bool success = 2;
    optional int64 last_log_index = 3;
    optional bool readonly = 4;
};

收到空 AppendEntries response (Replicator::_on_rpc_returned())

该函数用于空 AppendEntries response 处理和日志复制 response 处理,我们只关心空 AppendEntries 相关的部分。

  1. 首先从 ReplicatorId 获取对应的 Replicator(在 bthread 中运行的,可能发生了切换)

        Replicator *r = NULL;
        bthread_id_t dummy_id = { id };
        const long start_time_us = butil::gettimeofday_us();
        if (bthread_id_lock(dummy_id, (void**)&r) != 0) {
            return;
        }
  2. 如果 RPC 立马返回失败,说明 Follower crash 了,需要将 Replicator 阻塞一段时间:

        if (cntl->Failed()) {
            ss << " fail, sleep.";
            BRAFT_VLOG << ss.str();
            // If the follower crashes, any RPC to the follower fails immediately,
            // so we need to block the follower for a while instead of looping until
            // it comes back or be removed
            // dummy_id is unlock in block
            r->_reset_next_index();
            return r->_block(start_time_us, cntl->ErrorCode());
        }
  3. 如果 response->success()false

    • 如果 response->term() 大于当前 term 的话,让当前节点将 term 提升并退步成 Follower。
    • 否则 Follower 的日志跟 prev_log_index 和 prev_log_term 不匹配(参考上一节中分析的日志不匹配的两种情况)
      • ==如果 Follower 的日志比 Leader 的日志要少,直接更新 next_index 为 response->last_log_index() + 1==
      • ==否则说明 Follower 的日志和 Leader 有冲突,需要截断,递减 next_index==。
    • ==最后再发起一次空 AppendEntries RPC,使用新的 next_index 寻找匹配的位置==
        if (!response->success()) {
            if (response->term() > r->_options.term) {
                r->_reset_next_index(); // 空 AppendEntries 不会更改 next_index, 这里实际没有作用
    
                NodeImpl *node_impl = r->_options.node;
                // Acquire a reference of Node here in case that Node is destroyed
                // after _notify_on_caught_up.
                node_impl->AddRef();
                r->_notify_on_caught_up(EPERM, true);
                butil::Status status;
                status.set_error(EHIGHERTERMRESPONSE, "Leader receives higher term "
                        "%s from peer:%s", response->GetTypeName().c_str(), r->_options.peer_id.to_string().c_str());
                r->_destroy();
                node_impl->increase_term_to(response->term(), status); // 该函数会调用 step_down
                node_impl->Release();
                return;
            }
    
            r->_update_last_rpc_send_timestamp(rpc_send_time);
            // prev_log_index and prev_log_term doesn't match
            r->_reset_next_index();
            if (response->last_log_index() + 1 < r->_next_index) {
                // The peer contains less logs than leader
                r->_next_index = response->last_log_index() + 1;
            } else {  
                // The peer contains logs from old term which should be truncated,
                // decrease _last_log_at_peer by one to test the right index to keep
                if (BAIDU_LIKELY(r->_next_index > 1)) {
                    --r->_next_index;
                } else {
                    
                }
            }
            // dummy_id is unlock in _send_heartbeat
            r->_send_empty_entries(false);
            return;
        }
  4. 接下来的情况说明 response->success()true,日志匹配,调用 Replicator::_send_entries() 开始日志复制

再次发送空 AppendEntries

上一节说到,在 response::success 为 false 的时候,说明 Follower 的日志和 Leader 不匹配,重新设置 next_index 后再次发起一个空的 AppendEntries。我们再来看一下 Replicator::_send_empty_entries() 的逻辑,在 _fill_common_fields() 失败后会调用 _install_snapshot() 告诉 Follower 需要 Install Snapshot,那么什么情况下会失败呢?

void Replicator::_send_empty_entries(bool is_heartbeat) {
    // ... ...
    // _next_index 表示下一个要发给 replicator 的 log index,初始值是本地的 last_log_idnex + 1
    if (_fill_common_fields(request.get(), _next_index - 1, is_heartbeat) != 0) {
        CHECK(!is_heartbeat);
        // _id is unlock in _install_snapshot
        return _install_snapshot();
    }
    // ... ...
}

我们现在来看看 Replicator::_fill_common_fields() 的逻辑,在上面我们已经知道了,在 prev_log_index 不为 0 的情况下,get_term() 返回 0 只有两种情况:

  1. prev_log_index 比本地最新的 log index 大:这种情况不可能,因为 next_index 是递减的或者直接设置为 Follower 返回的 last_log_index,这两个值肯定都比 next_index 的初始值(本地最新 log index + 1)小。
  2. prev_log_index 的日志已经被删除了(发生了 snapshot 删除了日志)

那么 _fill_common_fields() 返回 -1 的唯一原因就是 prev_log_index 已经被删除了,所以需要 Follower 先 Install Snapshot。

int Replicator::_fill_common_fields(AppendEntriesRequest* request, 
                                    int64_t prev_log_index,
                                    bool is_heartbeat) {
    const int64_t prev_log_term = _options.log_manager->get_term(prev_log_index);
    if (prev_log_term == 0 && prev_log_index != 0) {
        if (!is_heartbeat) {
            CHECK_LT(prev_log_index, _options.log_manager->first_log_index());
            return -1;
        } else {
            // ... ...
        }
    }
    request->set_term(_options.term);
    request->set_group_id(_options.group_id);
    request->set_server_id(_options.server_id.to_string());
    request->set_peer_id(_options.peer_id.to_string());
    request->set_prev_log_index(prev_log_index);
    request->set_prev_log_term(prev_log_term);
    request->set_committed_index(_options.ballot_box->last_committed_index());
    return 0;
}

心跳

心跳 Timer

Replicator::start() 里面启动了 heartbeat_timer,它是个 bthread_timer,在超时的时候会调用 Replicator::_on_timedout() ,该函数会把对应的 thread_id 的状态设置为 ETIMEDOUT

void Replicator::_on_timedout(void* arg) {
    bthread_id_t id = { (uint64_t)arg };
    bthread_id_error(id, ETIMEDOUT);
}

void Replicator::_start_heartbeat_timer(long start_time_us) {
    const timespec due_time = butil::milliseconds_from(
            butil::microseconds_to_timespec(start_time_us), 
            *_options.dynamic_heartbeat_timeout_ms);
    // 这里的 _id 即在 Replicator::start() 中使用 bthread_id_create 创建的 bthread_it,其关联函数为 Replicator::_on_error()
    if (bthread_timer_add(&_heartbeat_timer, due_time, _on_timedout, (void*)_id.value) != 0) {
        _on_timedout((void*)_id.value);
    }
}

bthread_id_error() 会去调用 Replicator::_on_error(),然后开始 Replicator::_send_heartbeat():

int Replicator::_on_error(bthread_id_t id, void* arg, int error_code) {
    Replicator* r = (Replicator*)arg;
    if (error_code == ESTOP) {
        // ... ...
    } else if (error_code == ETIMEDOUT) {
        // This error is issued in the TimerThread, start a new bthread to avoid
        // blocking the caller.
        // Unlock id to remove the context-switch out of the critical section
        CHECK_EQ(0, bthread_id_unlock(id)) << "Fail to unlock" << id;
        bthread_t tid;
        if (bthread_start_urgent(&tid, NULL, _send_heartbeat,
                                 reinterpret_cast<void*>(id.value)) != 0) {
            PLOG(ERROR) << "Fail to start bthread";
            _send_heartbeat(reinterpret_cast<void*>(id.value));
        }
        return 0;
    }
    // ... ...
}

关于 bthread_id_create

// ----------------------------------------------------------------------
// Functions to create 64-bit identifiers that can be attached with data
// and locked without ABA issues. All functions can be called from
// multiple threads simultaneously. Notice that bthread_id_t is designed
// for managing a series of non-heavily-contended actions on an object.
// It's slower than mutex and not proper for general synchronizations.
// ----------------------------------------------------------------------

// Create a bthread_id_t and put it into *id. Crash when `id' is NULL.
// id->value will never be zero.
// `on_error' will be called after bthread_id_error() is called.
// -------------------------------------------------------------------------
// ! User must call bthread_id_unlock() or bthread_id_unlock_and_destroy()
// ! inside on_error.
// -------------------------------------------------------------------------
// Returns 0 on success, error code otherwise.
int bthread_id_create(
    bthread_id_t* id, void* data,
    int (*on_error)(bthread_id_t id, void* data, int error_code));

发送心跳 (Replicator::_send_empty_entries(true))

Replicator::_send_heartbeat() 会取调用 Replicator::_send_empty_entries() 发送心跳,因此心跳也是一种特殊的 AppendEntries request。

void Replicator::_send_empty_entries(bool is_heartbeat) {
    // ... ...
    if (_fill_common_fields(request.get(), _next_index - 1, is_heartbeat) != 0) {
        CHECK(!is_heartbeat);
        // _id is unlock in _install_snapshot
        return _install_snapshot();
    }
    if (is_heartbeat) {
        _heartbeat_in_fly = cntl->call_id();
        _heartbeat_counter++;
        // set RPC timeout for heartbeat, how long should timeout be is waiting to be optimized.
        cntl->set_timeout_ms(*_options.election_timeout_ms / 2);
    } else {
        // ... ...
    }

    google::protobuf::Closure* done = brpc::NewCallback(
                is_heartbeat ? _on_heartbeat_returned : _on_rpc_returned, 
                _id.value, cntl.get(), request.get(), response.get(),
                butil::monotonic_time_ms());

    RaftService_Stub stub(&_sending_channel);
    stub.append_entries(cntl.release(), request.release(), response.release(), done);
    CHECK_EQ(0, bthread_id_unlock(_id)) << "Fail to unlock " << _id;
}

AppendEntriesRequest 定义:

message AppendEntriesRequest {
    required string group_id = 1;
    required string server_id = 2;
    required string peer_id = 3;
    required int64 term = 4;
    required int64 prev_log_term = 5;
    required int64 prev_log_index = 6;
    repeated EntryMeta entries = 7;
    required int64 committed_index = 8;
};

收到心跳 (NodeImpl::handle_append_entries_request())

NodeImpl::handle_append_entries_request() 函数不仅处理心跳 request(也是个 empty AppendEntries request),也处理日志复制 request 以及前面提到的空 AppendEntries request,在这里我们只关心心跳相关的逻辑(心跳与空 AppendEntries 实际上无法区分)。

  1. 各种 check

    • 如果 request 中的 term 小于自己的 term,则将 response 的 success 设置为 false,term 设置为自己的 term 并返回
    • 如果 request 中的 term 大于自己的 term,则 step down
    • 如果当前节点的 leader_id 为空,保存 server_id 为 leader_id(这个逻辑应该在第一次发送空的 AppendEntries 中就已经设置了,这里不会为空)
    void NodeImpl::check_step_down(const int64_t request_term, const PeerId& server_id) {
        butil::Status status;
        if (request_term > _current_term) {
            status.set_error(ENEWLEADER, "Raft node receives message from new leader with higher term."); 
            step_down(request_term, false, status);
        } else if (_state != STATE_FOLLOWER) { 
            status.set_error(ENEWLEADER, "Candidate receives message from new leader with the same term.");
            step_down(request_term, false, status);
        } else if (_leader_id.is_empty()) {
            status.set_error(ENEWLEADER, "Follower receives message from new leader with the same term.");
            step_down(request_term, false, status); 
        }
        // save current leader
        if (_leader_id.is_empty()) { 
            reset_leader_id(server_id, status);
        }
    }
    • 如果收到的 request 不是 Leader 发送的,step down
  2. 更新 _last_leader_timestamp(pre_vote 和 投票请求的时候会根据这个时间判断是否能投票)

        if (!from_append_entries_cache) {
            // Requests from cache already updated timestamp
            _follower_lease.renew(_leader_id);
        }
  3. 接下来是获取正确的 next_index 的逻辑,该逻辑应该在第一个空的 AppendEntries 中完成,这里我们忽略

  4. 设置 response 并返回(注意因为无法区分心跳和空 AppendEntries request,这里的逻辑实际上和收到空 AppendEntries request 共用,除了 current_term 外,其他的值我们暂时不用关心)

        if (request->entries_size() == 0) {
            response->set_success(true);
            response->set_term(_current_term);
            response->set_last_log_index(_log_manager->last_log_index()); // heartbeat 中未用到
            response->set_readonly(_node_readonly);
            lck.unlock();
            // see the comments at FollowerStableClosure::run()
            _ballot_box->set_last_committed_index(std::min(request->committed_index(), prev_log_index)); // heartbeat 中未用到
            return;
        }

收到心跳响应 (Replicator::_on_heartbeat_returned())

收到心跳回复的处理逻辑很简单,如果 response 里面的 term 大于当前 term,则更新 term 并 step down 到 Follower。否则重启 heartbeat_timer 开始下一轮 heartbeat。

Summary

  • Candidate 选举成功成为 Leader 后,首先为每个 Follower 实例化一个 Replicator 对象,用来管理 Follower 的状态;然后启动 Heartbeat 定时器和发送一个空的 AppendEntries RPC 用来表明自己 Leader 的身份和寻找真实的 next_index。

  • next_index 的初始值为本地最新的 log_index + 1

  • next_index 的寻找策略分为两种

    • 没有日志冲突,只有日志缺失:直接设置 next_index 为 Follower 返回的最新日志 index
    • 有日志冲突,需要递减 next_index 再次尝试

    示例如下:

    img
    • ==Follower a 与 Leader 数据都是一致的,只是有数据缺失,可以优化为直接通知 Leader 从 logIndex=5 开始进行重传,这样只需一次回溯==。

    • ==Follower b 与 Leader有不一致性的数据,需要回溯 7 次才能找到需要进行重传的位置==。