Skip to content

Commit

Permalink
Support negative numbers for batch hint (#128)
Browse files Browse the repository at this point in the history
* Make the hint size which is the preferred size of the next batch of logs to be sent from the leader can be negative value

* Fix unit test failure and compile warnings

* Reduce client reqeust timeout value

* Fix unit test error

* [Update PR] Skip immediate catch-up upon hint = -1

Co-authored-by: Derek Li <hli2@ebay.com>
Co-authored-by: Jung-Sang Ahn <jungsang.ahn@gmail.com>
  • Loading branch information
3 people authored Aug 6, 2020
1 parent 6d371dd commit 6eb66c5
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 31 deletions.
8 changes: 6 additions & 2 deletions examples/in_memory_log_store.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,15 @@ ptr< std::vector< ptr<log_entry> > >
ptr<std::vector<ptr<log_entry>>>
inmem_log_store::log_entries_ext(ulong start,
ulong end,
ulong batch_size_hint_in_bytes)
int64 batch_size_hint_in_bytes)
{
ptr< std::vector< ptr<log_entry> > > ret =
cs_new< std::vector< ptr<log_entry> > >();

if (batch_size_hint_in_bytes < 0) {
return ret;
}

size_t accum_size = 0;
for (ulong ii = start ; ii < end ; ++ii) {
ptr<log_entry> src = nullptr;
Expand All @@ -128,7 +132,7 @@ ptr<std::vector<ptr<log_entry>>>
ret->push_back(make_clone(src));
accum_size += src->get_buf().size();
if (batch_size_hint_in_bytes &&
accum_size >= batch_size_hint_in_bytes) break;
accum_size >= (ulong)batch_size_hint_in_bytes) break;
}
return ret;
}
Expand Down
2 changes: 1 addition & 1 deletion examples/in_memory_log_store.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public:
ptr<std::vector<ptr<log_entry>>> log_entries(ulong start, ulong end);

ptr<std::vector<ptr<log_entry>>> log_entries_ext(
ulong start, ulong end, ulong batch_size_hint_in_bytes = 0);
ulong start, ulong end, int64 batch_size_hint_in_bytes = 0);

ptr<log_entry> entry_at(ulong index);

Expand Down
1 change: 1 addition & 0 deletions include/libnuraft/basic_types.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ limitations under the License.
#include <cstdint>

typedef uint64_t ulong;
typedef int64_t int64;
typedef void* any_ptr;
typedef unsigned char byte;
typedef uint16_t ushort;
Expand Down
8 changes: 4 additions & 4 deletions include/libnuraft/log_store.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,20 @@ public:
* Get log entries with index [start, end).
*
* The total size of the returned entries is limited by batch_size_hint.
* Entries near the end are not returned if the total size of the entries exceeds
* batch_size_hint.
*
* Return nullptr to indicate error if any log entry within the requested range
* could not be retrieved (e.g. due to external log truncation).
*
* @param start The start log index number (inclusive).
* @param end The end log index number (exclusive).
* @param batch_size_hint_in_bytes total size (in bytes) of the returned entries
* @param batch_size_hint_in_bytes Total size (in bytes) of the returned entries,
* see the detailed comment at
* `state_machine::get_next_batch_size_hint_in_bytes()`.
* @return The log entries between [start, end) and limited by the total size
* given by the batch_size_hint_in_bytes.
*/
virtual ptr<std::vector<ptr<log_entry>>> log_entries_ext(
ulong start, ulong end, ulong batch_size_hint_in_bytes = 0) {
ulong start, ulong end, int64 batch_size_hint_in_bytes = 0) {
return log_entries(start, end);
}

Expand Down
6 changes: 3 additions & 3 deletions include/libnuraft/peer.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,11 @@ public:
next_log_idx_ = idx;
}

ulong get_next_batch_size_hint_in_bytes() const {
int64 get_next_batch_size_hint_in_bytes() const {
return next_batch_size_hint_in_bytes_;
}

void set_next_batch_size_hint_in_bytes(ulong batch_size) {
void set_next_batch_size_hint_in_bytes(int64 batch_size) {
next_batch_size_hint_in_bytes_ = batch_size;
}

Expand Down Expand Up @@ -346,7 +346,7 @@ private:
/**
* Hint of the next log batch size in bytes.
*/
std::atomic<ulong> next_batch_size_hint_in_bytes_;
std::atomic<int64> next_batch_size_hint_in_bytes_;

/**
* The last log index whose term matches up with the leader.
Expand Down
6 changes: 3 additions & 3 deletions include/libnuraft/resp_msg.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ public:
return next_idx_;
}

ulong get_next_batch_size_hint_in_bytes() const {
int64 get_next_batch_size_hint_in_bytes() const {
return next_batch_size_hint_in_bytes_;
}

void set_next_batch_size_hint_in_bytes(ulong bytes) {
void set_next_batch_size_hint_in_bytes(int64 bytes) {
next_batch_size_hint_in_bytes_ = bytes;
}

Expand Down Expand Up @@ -128,7 +128,7 @@ public:

private:
ulong next_idx_;
ulong next_batch_size_hint_in_bytes_;
int64 next_batch_size_hint_in_bytes_;
bool accepted_;
ptr<buffer> ctx_;
ptr<peer> peer_;
Expand Down
11 changes: 7 additions & 4 deletions include/libnuraft/state_machine.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,16 @@ public:
* Return a hint about the preferred size (in number of bytes)
* of the next batch of logs to be sent from the leader.
*
* Return 0 to indicate no preferred size (any size is good).
*
* Only applicable on followers.
*
* @return the preferred size of the next log batch
* @return The preferred size of the next log batch.
* `0` indicates no preferred size (any size is good).
* `positive value` indicates at least one log can be sent,
* (the size of that log may be bigger than this hint size).
* `negative value` indicates no log should be sent since this
* follower is busy handling pending logs.
*/
virtual ulong get_next_batch_size_hint_in_bytes() { return 0; }
virtual int64 get_next_batch_size_hint_in_bytes() { return 0; }

/**
* (Deprecated)
Expand Down
6 changes: 3 additions & 3 deletions src/asio_service.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ class rpc_session : public std::enable_shared_from_this<rpc_session> {
// Hint is given, set the flag.
flags |= INCLUDE_HINT;
// For future extension, we will put 2-byte version and 2-byte length.
resp_hint_size += sizeof(uint16_t) * 2 + sizeof(ulong);
resp_hint_size += sizeof(uint16_t) * 2 + sizeof(int64);
}

size_t carried_data_size = resp_meta_size + resp_hint_size + resp_ctx_size;
Expand Down Expand Up @@ -610,7 +610,7 @@ class rpc_session : public std::enable_shared_from_this<rpc_session> {
const uint16_t CUR_HINT_VERSION = 0;
bs.put_u16(CUR_HINT_VERSION);
bs.put_u16(sizeof(ulong));
bs.put_u64(resp->get_next_batch_size_hint_in_bytes());
bs.put_i64(resp->get_next_batch_size_hint_in_bytes());
}

if (resp_ctx_size) {
Expand Down Expand Up @@ -1377,7 +1377,7 @@ class asio_rpc_client
uint16_t hint_version = bs.get_u16();
(void)hint_version;
hint_len = bs.get_u16();
rsp->set_next_batch_size_hint_in_bytes(bs.get_u64());
rsp->set_next_batch_size_hint_in_bytes(bs.get_i64());
remaining_len -= sizeof(uint16_t) * 2 + hint_len;
}

Expand Down
21 changes: 12 additions & 9 deletions src/handle_append_entries.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -760,9 +760,9 @@ ptr<resp_msg> raft_server::handle_append_entries(req_msg& req)
restart_election_timer();
}

ulong bs_hint = state_machine_->get_next_batch_size_hint_in_bytes();
int64 bs_hint = state_machine_->get_next_batch_size_hint_in_bytes();
resp->set_next_batch_size_hint_in_bytes(bs_hint);
p_tr("batch size hint: %zu bytes", bs_hint);
p_tr("batch size hint: %ld bytes", bs_hint);

out_of_log_range_ = false;

Expand Down Expand Up @@ -799,8 +799,8 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
p_tr("handle append entries resp (from %d), resp.get_next_idx(): %d\n",
(int)p->get_id(), (int)resp.get_next_idx());

ulong bs_hint = resp.get_next_batch_size_hint_in_bytes();
p_tr("peer %d batch size hint: %zu bytes", p->get_id(), bs_hint);
int64 bs_hint = resp.get_next_batch_size_hint_in_bytes();
p_tr("peer %d batch size hint: %ld bytes", p->get_id(), bs_hint);
p->set_next_batch_size_hint_in_bytes(bs_hint);

if (resp.get_accepted()) {
Expand All @@ -811,7 +811,7 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
p->set_next_log_idx(resp.get_next_idx());
prev_matched_idx = p->get_matched_idx();
new_matched_idx = resp.get_next_idx() - 1;
p_tr("peer %d, prev idx: %ld, next idx: %ld",
p_tr("peer %d, prev matched idx: %ld, new matched idx: %ld",
p->get_id(), prev_matched_idx, new_matched_idx);
p->set_matched_idx(new_matched_idx);
}
Expand All @@ -827,10 +827,6 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
need_to_catchup = p->clear_pending_commit() ||
resp.get_next_idx() < log_store_->next_slot();

if (srv_to_leave_ && srv_to_leave_->get_id() == p->get_id()) {

}

} else {
ulong prev_next_log = p->get_next_log_idx();
std::lock_guard<std::mutex> guard(p->get_lock());
Expand Down Expand Up @@ -915,6 +911,13 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
return;
}

if (bs_hint < 0) {
// If hint is a negative number, we should set `need_to_catchup`
// to `false` to avoid sending meaningless messages continuously
// which eats up CPU. Then the leader will send heartbeats only.
need_to_catchup = false;
}

// This may not be a leader anymore,
// such as the response was sent out long time ago
// and the role was updated by UpdateTerm call
Expand Down
28 changes: 27 additions & 1 deletion tests/unit/asio_service_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ int response_hint_test(bool with_meta) {
}
CHK_Z( launch_servers(pkgs, false) );

_msg("enable batch size hint\n");
_msg("enable batch size hint with positive value\n");
for (RaftAsioPkg* ee: pkgs) {
ee->getTestSm()->set_next_batch_size_hint_in_bytes(1);
}
Expand Down Expand Up @@ -657,6 +657,32 @@ int response_hint_test(bool with_meta) {
CHK_OK( s2.getTestSm()->isSame( *s1.getTestSm() ) );
CHK_OK( s3.getTestSm()->isSame( *s1.getTestSm() ) );

_msg("enable batch size hint with negative value\n");
for (RaftAsioPkg* ee: pkgs) {
ee->getTestSm()->set_next_batch_size_hint_in_bytes(-1);
}

TestSuite::sleep_sec(1, "wait peer's hint size info refreshed in leader side");

// With negative hint size, append_entries will timeout due to
// raft server can not commit. Set timeout to a small value.
raft_params params = s1.raftServer->get_current_params();
params.with_client_req_timeout(1000);
s1.raftServer->update_params(params);

for (size_t ii=0; ii<3; ++ii) {
std::string msg_str = "3rd_" + std::to_string(ii);
ptr<buffer> msg = buffer::alloc(sizeof(uint32_t) + msg_str.size());
buffer_serializer bs(msg);
bs.put_str(msg_str);
s1.raftServer->append_entries( {msg} );
}
TestSuite::sleep_sec(1, "wait for replication but actually no replication happen");

// State machine should be identical. All are not committed.
CHK_OK( s2.getTestSm()->isSame( *s1.getTestSm() ) );
CHK_OK( s3.getTestSm()->isSame( *s1.getTestSm() ) );

if (with_meta) {
// Callback functions for meta should have been called.
CHK_GT(read_req_cb_count.load(), 0);
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/raft_functional_common.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public:
customBatchSize = to;
}

ulong get_next_batch_size_hint_in_bytes() {
int64 get_next_batch_size_hint_in_bytes() {
return customBatchSize;
}

Expand Down

0 comments on commit 6eb66c5

Please sign in to comment.