Skip to content

Commit

Permalink
Add optional timeout to auto_forwarded requests (#191)
Browse files Browse the repository at this point in the history
* Merge pull request #5 from ClickHouse-Extras/add_timeout_for_request_forwarding

Add timeouts to requests autoforwarding

(cherry picked from commit 7adf7ae)

* Setting value zero

* Fix build

* Bugfixes

* Fix test
  • Loading branch information
alesapin authored Apr 5, 2021
1 parent 2df09cc commit 026cf96
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 14 deletions.
19 changes: 19 additions & 0 deletions include/libnuraft/raft_params.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ struct raft_params {
, auto_adjust_quorum_for_small_cluster_(false)
, locking_method_type_(dual_mutex)
, return_method_(blocking)
, auto_forwarding_req_timeout_(0)
{}

/**
Expand Down Expand Up @@ -310,6 +311,18 @@ struct raft_params {
return *this;
}

/**
* Set the auto-forwarding request timeout
*
* @param timeout_ms New timeout in millisecond.
* @return self
*/
raft_params& with_auto_forwarding_req_timeout(int32 timeout_ms) {
auto_forwarding_req_timeout_ = timeout_ms;
return *this;
}


/**
* Return heartbeat interval.
* If given heartbeat interval is smaller than a specific value
Expand Down Expand Up @@ -488,6 +501,12 @@ public:
* To choose blocking call or asynchronous call.
*/
return_method_type return_method_;

/**
* Wait ms for response after forwarding request to leader.
* must be larger than client_req_timeout_.
*/
int32 auto_forwarding_req_timeout_;
};

}
Expand Down
4 changes: 3 additions & 1 deletion include/libnuraft/rpc_cli.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ class rpc_client {
__interface_body__(rpc_client);

public:
virtual void send(ptr<req_msg>& req, rpc_handler& when_done) = 0;
virtual void send(ptr<req_msg>& req, rpc_handler& when_done, uint64_t send_timeout_ms = 0) = 0;

virtual uint64_t get_id() const = 0;

virtual bool is_abandoned() const = 0;
};

}
Expand Down
50 changes: 45 additions & 5 deletions src/asio_service.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,7 @@ class asio_rpc_client
, num_send_fails_(0)
, abandoned_(false)
, socket_busy_(false)
, operation_timer_(io_svc)
, l_(l)
{
client_id_ = impl_->assign_client_id();
Expand Down Expand Up @@ -861,6 +862,10 @@ class asio_rpc_client
return client_id_;
}

bool is_abandoned() const override {
return abandoned_;
}

#ifndef SSL_LIBRARY_NOT_FOUND
bool verify_certificate(bool preverified,
asio::ssl::verify_context& ctx)
Expand All @@ -886,6 +891,7 @@ class asio_rpc_client
ptr<asio::steady_timer> timer,
ptr<req_msg>& req,
rpc_handler& when_done,
uint64_t send_timeout_ms,
const ERROR_CODE& err )
{
if ( err || num_send_fails_ >= SEND_RETRY_MAX ) {
Expand All @@ -906,10 +912,10 @@ class asio_rpc_client
when_done(rsp, except);
return;
}
send(req, when_done);
send(req, when_done, send_timeout_ms);
}

virtual void send(ptr<req_msg>& req, rpc_handler& when_done) __override__ {
virtual void send(ptr<req_msg>& req, rpc_handler& when_done, uint64_t send_timeout_ms = 0) __override__ {
if (abandoned_) {
p_er( "client %p to %s:%s is already stale (SSL %s)",
this, host_.c_str(), port_.c_str(),
Expand Down Expand Up @@ -952,6 +958,7 @@ class asio_rpc_client
timer,
req,
when_done,
send_timeout_ms,
std::placeholders::_1 ) );
return;
}
Expand All @@ -968,7 +975,7 @@ class asio_rpc_client

resolver_.async_resolve
( q,
[self, this, req, when_done]
[self, this, req, when_done, send_timeout_ms]
( std::error_code err,
asio::ip::tcp::resolver::iterator itor ) -> void
{
Expand All @@ -980,6 +987,7 @@ class asio_rpc_client
self,
req,
when_done,
send_timeout_ms,
std::placeholders::_1,
std::placeholders::_2 ) );
} else {
Expand Down Expand Up @@ -1014,6 +1022,7 @@ class asio_rpc_client
timer,
req,
when_done,
send_timeout_ms,
std::placeholders::_1 ) );
return;
}
Expand Down Expand Up @@ -1091,6 +1100,17 @@ class asio_rpc_client
}
req_buf->pos(0);

if (send_timeout_ms != 0)
{
operation_timer_.expires_after
( std::chrono::duration_cast<std::chrono::nanoseconds>
( std::chrono::milliseconds( send_timeout_ms ) ) );
operation_timer_.async_wait( std::bind( &asio_rpc_client::cancel_socket,
this,
std::placeholders::_1 ) );
}


// Note: without passing `req_buf` to callback function, it will be
// unreachable before the write is done so that it is freed
// and the memory corruption will occur.
Expand Down Expand Up @@ -1141,8 +1161,21 @@ class asio_rpc_client
#endif
}

void cancel_socket(const ERROR_CODE& err) {
if (err) // Timer was cancelled itself, it's OK.
return;

if (socket().is_open()) {
p_wn("cancelling operations due to socket (%s:%s) timeout",
host_.c_str(), port_.c_str());
abandoned_ = true;
socket_.cancel();
}
}

void connected(ptr<req_msg>& req,
rpc_handler& when_done,
uint64_t send_timeout_ms,
std::error_code err,
asio::ip::tcp::resolver::iterator itor)
{
Expand All @@ -1159,10 +1192,11 @@ class asio_rpc_client
this,
req,
when_done,
send_timeout_ms,
std::placeholders::_1 ) );
#endif
} else {
this->send(req, when_done);
this->send(req, when_done, send_timeout_ms);
}

} else {
Expand All @@ -1180,6 +1214,7 @@ class asio_rpc_client

void handle_handshake(ptr<req_msg>& req,
rpc_handler& when_done,
uint64_t send_timeout_ms,
const ERROR_CODE& err)
{
ptr<asio_rpc_client> self = this->shared_from_this();
Expand All @@ -1188,7 +1223,7 @@ class asio_rpc_client
p_in( "handshake with %s:%s succeeded (as a client)",
host_.c_str(), port_.c_str() );
ssl_ready_ = true;
this->send(req, when_done);
this->send(req, when_done, send_timeout_ms);

} else {
abandoned_ = true;
Expand Down Expand Up @@ -1231,6 +1266,7 @@ class asio_rpc_client
std::placeholders::_2));

} else {
operation_timer_.cancel();
abandoned_ = true;
ptr<resp_msg> rsp;
ptr<rpc_exception> except
Expand Down Expand Up @@ -1327,6 +1363,7 @@ class asio_rpc_client
std::placeholders::_1,
std::placeholders::_2 ) );
} else {
operation_timer_.cancel();
set_busy_flag(false);
ptr<rpc_exception> except;
when_done(rsp, except);
Expand All @@ -1348,6 +1385,7 @@ class asio_rpc_client
ctx_buf->pos(0);
rsp->set_ctx(ctx_buf);

operation_timer_.cancel();
set_busy_flag(false);
ptr<rpc_exception> except;
when_done(rsp, except);
Expand Down Expand Up @@ -1397,6 +1435,7 @@ class asio_rpc_client
rsp->set_ctx(actual_ctx);
}

operation_timer_.cancel();
set_busy_flag(false);
ptr<rpc_exception> except;
when_done(rsp, except);
Expand Down Expand Up @@ -1443,6 +1482,7 @@ class asio_rpc_client
std::atomic<bool> abandoned_;
std::atomic<bool> socket_busy_;
uint64_t client_id_;
asio::steady_timer operation_timer_;
ptr<logger> l_;
};

Expand Down
6 changes: 4 additions & 2 deletions src/handle_user_cmd.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ ptr< cmd_result< ptr<buffer> > > raft_server::send_msg_to_leader(ptr<req_msg>& r
{
auto_lock(rpc_clients_lock_);
auto itor = rpc_clients_.find(leader_id);
if (itor == rpc_clients_.end()) {
if (itor == rpc_clients_.end() || itor->second->is_abandoned()) {
ptr<srv_config> srv_conf = c_conf->get_server(leader_id);
if (!srv_conf) {
return cs_new< cmd_result< ptr<buffer> > >(result);
Expand Down Expand Up @@ -185,9 +185,11 @@ ptr< cmd_result< ptr<buffer> > > raft_server::send_msg_to_leader(ptr<req_msg>& r

presult->set_result(resp_ctx, perr);
};
rpc_cli->send(req, handler);

ptr<raft_params> params = ctx_->get_params();

rpc_cli->send(req, handler, params->auto_forwarding_req_timeout_);

if (params->return_method_ == raft_params::blocking) {
presult->get();
}
Expand Down
88 changes: 86 additions & 2 deletions tests/unit/asio_service_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ namespace asio_service_test {

int launch_servers(const std::vector<RaftAsioPkg*>& pkgs,
bool enable_ssl,
bool use_global_asio = false)
bool use_global_asio = false,
const raft_server::init_options & opt = raft_server::init_options())
{
size_t num_srvs = pkgs.size();
CHK_GT(num_srvs, 0);

for (auto& entry: pkgs) {
RaftAsioPkg* pp = entry;
pp->initServer(enable_ssl, use_global_asio);
pp->initServer(enable_ssl, use_global_asio, opt);
}
// Wait longer than upper timeout.
TestSuite::sleep_sec(1);
Expand Down Expand Up @@ -1200,6 +1201,86 @@ int leadership_transfer_test() {
return 0;
}

int auto_forwarding_timeout_test() {
std::string s1_addr = "127.0.0.1:20010";
std::string s2_addr = "127.0.0.1:20020";
std::string s3_addr = "127.0.0.1:20030";

RaftAsioPkg s1(1, s1_addr);
RaftAsioPkg s2(2, s2_addr);
RaftAsioPkg s3(3, s3_addr);
std::vector<RaftAsioPkg*> pkgs = {&s1, &s2, &s3};

raft_server::init_options opt;

/// Make leader quite slow
opt.raft_callback_ = [](cb_func::Type type, cb_func::Param* param) -> cb_func::ReturnCode {
if (type == cb_func::Type::AppendLogs) {
TestSuite::sleep_ms(150);
}
return cb_func::ReturnCode::Ok;
};

CHK_Z( launch_servers(pkgs, false, false, opt) );

_msg("organizing raft group\n");
CHK_Z( make_group(pkgs) );

CHK_TRUE( s1.raftServer->is_leader() );
CHK_EQ(1, s1.raftServer->get_leader());
CHK_EQ(1, s2.raftServer->get_leader());
CHK_EQ(1, s3.raftServer->get_leader());

for (auto& entry: pkgs) {
RaftAsioPkg* pp = entry;
raft_params param = pp->raftServer->get_current_params();
param.auto_forwarding_ = true;
pp->raftServer->update_params(param);
}

std::string test_msg = "test";
ptr<buffer> msg = buffer::alloc(test_msg.size() + 1);
msg->put(test_msg);

// Forwarded as expected
auto ret1 = s3.raftServer->append_entries({msg});
CHK_TRUE(ret1->get_accepted());
CHK_EQ(ret1->get_result_code(), nuraft::cmd_result_code::OK)

for (auto& entry: pkgs) {
RaftAsioPkg* pp = entry;
raft_params param = pp->raftServer->get_current_params();
param.auto_forwarding_req_timeout_ = 100;
pp->raftServer->update_params(param);
}

auto ret2 = s3.raftServer->append_entries({msg});

// Timeout happened
CHK_FALSE(ret2->get_accepted());

for (auto& entry: pkgs) {
RaftAsioPkg* pp = entry;
raft_params param = pp->raftServer->get_current_params();
param.auto_forwarding_req_timeout_ = 0;
pp->raftServer->update_params(param);
}

// Work again
auto ret3 = s3.raftServer->append_entries({msg});
CHK_TRUE(ret3->get_accepted());
CHK_EQ(ret3->get_result_code(), nuraft::cmd_result_code::OK)

s1.raftServer->shutdown();
s2.raftServer->shutdown();
s3.raftServer->shutdown();
TestSuite::sleep_sec(1, "shutting down");

SimpleLogger::shutdown();

return 0;
}

} // namespace asio_service_test;
using namespace asio_service_test;

Expand Down Expand Up @@ -1251,6 +1332,9 @@ int main(int argc, char** argv) {
ts.doTest( "leadership transfer test",
leadership_transfer_test );

ts.doTest( "auto forwarding timeout test",
auto_forwarding_timeout_test );

#ifdef ENABLE_RAFT_STATS
_msg("raft stats: ENABLED\n");
#else
Expand Down
6 changes: 5 additions & 1 deletion tests/unit/fake_network.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ FakeClient::FakeClient(FakeNetwork* mother,

FakeClient::~FakeClient() {}

void FakeClient::send(ptr<req_msg>& req, rpc_handler& when_done) {
void FakeClient::send(ptr<req_msg>& req, rpc_handler& when_done, uint64_t /*send_timeout_ms*/) {
SimpleLogger* ll = motherNet->getBase()->getLogger();
_log_info(ll, "got request %s -> %s, %s",
motherNet->getEndpoint().c_str(),
Expand All @@ -321,6 +321,10 @@ uint64_t FakeClient::get_id() const {
return myId;
}

bool FakeClient::is_abandoned() const {
return false;
}


// === FakeTimer

Expand Down
Loading

0 comments on commit 026cf96

Please sign in to comment.