diff --git a/include/libnuraft/raft_params.hxx b/include/libnuraft/raft_params.hxx index 83489525..59b5e654 100644 --- a/include/libnuraft/raft_params.hxx +++ b/include/libnuraft/raft_params.hxx @@ -90,6 +90,7 @@ struct raft_params { , auto_adjust_quorum_for_small_cluster_(false) , locking_method_type_(dual_mutex) , return_method_(blocking) + , auto_forwarding_req_timeout_(0) {} /** @@ -310,6 +311,18 @@ struct raft_params { return *this; } + /** + * Set the auto-forwarding request timeout + * + * @param timeout_ms New timeout in millisecond. + * @return self + */ + raft_params& with_auto_forwarding_req_timeout(int32 timeout_ms) { + auto_forwarding_req_timeout_ = timeout_ms; + return *this; + } + + /** * Return heartbeat interval. * If given heartbeat interval is smaller than a specific value @@ -488,6 +501,12 @@ public: * To choose blocking call or asynchronous call. */ return_method_type return_method_; + + /** + * Wait ms for response after forwarding request to leader. + * must be larger than client_req_timeout_. + */ + int32 auto_forwarding_req_timeout_; }; } diff --git a/include/libnuraft/rpc_cli.hxx b/include/libnuraft/rpc_cli.hxx index 61ba378e..524fe9bf 100644 --- a/include/libnuraft/rpc_cli.hxx +++ b/include/libnuraft/rpc_cli.hxx @@ -40,9 +40,11 @@ class rpc_client { __interface_body__(rpc_client); public: - virtual void send(ptr& req, rpc_handler& when_done) = 0; + virtual void send(ptr& req, rpc_handler& when_done, uint64_t send_timeout_ms = 0) = 0; virtual uint64_t get_id() const = 0; + + virtual bool is_abandoned() const = 0; }; } diff --git a/src/asio_service.cxx b/src/asio_service.cxx index b538e471..fc9a26fd 100644 --- a/src/asio_service.cxx +++ b/src/asio_service.cxx @@ -828,6 +828,7 @@ class asio_rpc_client , num_send_fails_(0) , abandoned_(false) , socket_busy_(false) + , operation_timer_(io_svc) , l_(l) { client_id_ = impl_->assign_client_id(); @@ -861,6 +862,10 @@ class asio_rpc_client return client_id_; } + bool is_abandoned() const override { + return abandoned_; + } + #ifndef SSL_LIBRARY_NOT_FOUND bool verify_certificate(bool preverified, asio::ssl::verify_context& ctx) @@ -886,6 +891,7 @@ class asio_rpc_client ptr timer, ptr& req, rpc_handler& when_done, + uint64_t send_timeout_ms, const ERROR_CODE& err ) { if ( err || num_send_fails_ >= SEND_RETRY_MAX ) { @@ -906,10 +912,10 @@ class asio_rpc_client when_done(rsp, except); return; } - send(req, when_done); + send(req, when_done, send_timeout_ms); } - virtual void send(ptr& req, rpc_handler& when_done) __override__ { + virtual void send(ptr& req, rpc_handler& when_done, uint64_t send_timeout_ms = 0) __override__ { if (abandoned_) { p_er( "client %p to %s:%s is already stale (SSL %s)", this, host_.c_str(), port_.c_str(), @@ -952,6 +958,7 @@ class asio_rpc_client timer, req, when_done, + send_timeout_ms, std::placeholders::_1 ) ); return; } @@ -968,7 +975,7 @@ class asio_rpc_client resolver_.async_resolve ( q, - [self, this, req, when_done] + [self, this, req, when_done, send_timeout_ms] ( std::error_code err, asio::ip::tcp::resolver::iterator itor ) -> void { @@ -980,6 +987,7 @@ class asio_rpc_client self, req, when_done, + send_timeout_ms, std::placeholders::_1, std::placeholders::_2 ) ); } else { @@ -1014,6 +1022,7 @@ class asio_rpc_client timer, req, when_done, + send_timeout_ms, std::placeholders::_1 ) ); return; } @@ -1091,6 +1100,17 @@ class asio_rpc_client } req_buf->pos(0); + if (send_timeout_ms != 0) + { + operation_timer_.expires_after + ( std::chrono::duration_cast + ( std::chrono::milliseconds( send_timeout_ms ) ) ); + operation_timer_.async_wait( std::bind( &asio_rpc_client::cancel_socket, + this, + std::placeholders::_1 ) ); + } + + // Note: without passing `req_buf` to callback function, it will be // unreachable before the write is done so that it is freed // and the memory corruption will occur. @@ -1141,8 +1161,21 @@ class asio_rpc_client #endif } + void cancel_socket(const ERROR_CODE& err) { + if (err) // Timer was cancelled itself, it's OK. + return; + + if (socket().is_open()) { + p_wn("cancelling operations due to socket (%s:%s) timeout", + host_.c_str(), port_.c_str()); + abandoned_ = true; + socket_.cancel(); + } + } + void connected(ptr& req, rpc_handler& when_done, + uint64_t send_timeout_ms, std::error_code err, asio::ip::tcp::resolver::iterator itor) { @@ -1159,10 +1192,11 @@ class asio_rpc_client this, req, when_done, + send_timeout_ms, std::placeholders::_1 ) ); #endif } else { - this->send(req, when_done); + this->send(req, when_done, send_timeout_ms); } } else { @@ -1180,6 +1214,7 @@ class asio_rpc_client void handle_handshake(ptr& req, rpc_handler& when_done, + uint64_t send_timeout_ms, const ERROR_CODE& err) { ptr self = this->shared_from_this(); @@ -1188,7 +1223,7 @@ class asio_rpc_client p_in( "handshake with %s:%s succeeded (as a client)", host_.c_str(), port_.c_str() ); ssl_ready_ = true; - this->send(req, when_done); + this->send(req, when_done, send_timeout_ms); } else { abandoned_ = true; @@ -1231,6 +1266,7 @@ class asio_rpc_client std::placeholders::_2)); } else { + operation_timer_.cancel(); abandoned_ = true; ptr rsp; ptr except @@ -1327,6 +1363,7 @@ class asio_rpc_client std::placeholders::_1, std::placeholders::_2 ) ); } else { + operation_timer_.cancel(); set_busy_flag(false); ptr except; when_done(rsp, except); @@ -1348,6 +1385,7 @@ class asio_rpc_client ctx_buf->pos(0); rsp->set_ctx(ctx_buf); + operation_timer_.cancel(); set_busy_flag(false); ptr except; when_done(rsp, except); @@ -1397,6 +1435,7 @@ class asio_rpc_client rsp->set_ctx(actual_ctx); } + operation_timer_.cancel(); set_busy_flag(false); ptr except; when_done(rsp, except); @@ -1443,6 +1482,7 @@ class asio_rpc_client std::atomic abandoned_; std::atomic socket_busy_; uint64_t client_id_; + asio::steady_timer operation_timer_; ptr l_; }; diff --git a/src/handle_user_cmd.cxx b/src/handle_user_cmd.cxx index 4e1d1cc6..c6202c8a 100644 --- a/src/handle_user_cmd.cxx +++ b/src/handle_user_cmd.cxx @@ -147,7 +147,7 @@ ptr< cmd_result< ptr > > raft_server::send_msg_to_leader(ptr& r { auto_lock(rpc_clients_lock_); auto itor = rpc_clients_.find(leader_id); - if (itor == rpc_clients_.end()) { + if (itor == rpc_clients_.end() || itor->second->is_abandoned()) { ptr srv_conf = c_conf->get_server(leader_id); if (!srv_conf) { return cs_new< cmd_result< ptr > >(result); @@ -185,9 +185,11 @@ ptr< cmd_result< ptr > > raft_server::send_msg_to_leader(ptr& r presult->set_result(resp_ctx, perr); }; - rpc_cli->send(req, handler); ptr params = ctx_->get_params(); + + rpc_cli->send(req, handler, params->auto_forwarding_req_timeout_); + if (params->return_method_ == raft_params::blocking) { presult->get(); } diff --git a/tests/unit/asio_service_test.cxx b/tests/unit/asio_service_test.cxx index 211dba3c..cc9d620d 100644 --- a/tests/unit/asio_service_test.cxx +++ b/tests/unit/asio_service_test.cxx @@ -31,14 +31,15 @@ namespace asio_service_test { int launch_servers(const std::vector& pkgs, bool enable_ssl, - bool use_global_asio = false) + bool use_global_asio = false, + const raft_server::init_options & opt = raft_server::init_options()) { size_t num_srvs = pkgs.size(); CHK_GT(num_srvs, 0); for (auto& entry: pkgs) { RaftAsioPkg* pp = entry; - pp->initServer(enable_ssl, use_global_asio); + pp->initServer(enable_ssl, use_global_asio, opt); } // Wait longer than upper timeout. TestSuite::sleep_sec(1); @@ -1200,6 +1201,86 @@ int leadership_transfer_test() { return 0; } +int auto_forwarding_timeout_test() { + std::string s1_addr = "127.0.0.1:20010"; + std::string s2_addr = "127.0.0.1:20020"; + std::string s3_addr = "127.0.0.1:20030"; + + RaftAsioPkg s1(1, s1_addr); + RaftAsioPkg s2(2, s2_addr); + RaftAsioPkg s3(3, s3_addr); + std::vector pkgs = {&s1, &s2, &s3}; + + raft_server::init_options opt; + + /// Make leader quite slow + opt.raft_callback_ = [](cb_func::Type type, cb_func::Param* param) -> cb_func::ReturnCode { + if (type == cb_func::Type::AppendLogs) { + TestSuite::sleep_ms(150); + } + return cb_func::ReturnCode::Ok; + }; + + CHK_Z( launch_servers(pkgs, false, false, opt) ); + + _msg("organizing raft group\n"); + CHK_Z( make_group(pkgs) ); + + CHK_TRUE( s1.raftServer->is_leader() ); + CHK_EQ(1, s1.raftServer->get_leader()); + CHK_EQ(1, s2.raftServer->get_leader()); + CHK_EQ(1, s3.raftServer->get_leader()); + + for (auto& entry: pkgs) { + RaftAsioPkg* pp = entry; + raft_params param = pp->raftServer->get_current_params(); + param.auto_forwarding_ = true; + pp->raftServer->update_params(param); + } + + std::string test_msg = "test"; + ptr msg = buffer::alloc(test_msg.size() + 1); + msg->put(test_msg); + + // Forwarded as expected + auto ret1 = s3.raftServer->append_entries({msg}); + CHK_TRUE(ret1->get_accepted()); + CHK_EQ(ret1->get_result_code(), nuraft::cmd_result_code::OK) + + for (auto& entry: pkgs) { + RaftAsioPkg* pp = entry; + raft_params param = pp->raftServer->get_current_params(); + param.auto_forwarding_req_timeout_ = 100; + pp->raftServer->update_params(param); + } + + auto ret2 = s3.raftServer->append_entries({msg}); + + // Timeout happened + CHK_FALSE(ret2->get_accepted()); + + for (auto& entry: pkgs) { + RaftAsioPkg* pp = entry; + raft_params param = pp->raftServer->get_current_params(); + param.auto_forwarding_req_timeout_ = 0; + pp->raftServer->update_params(param); + } + + // Work again + auto ret3 = s3.raftServer->append_entries({msg}); + CHK_TRUE(ret3->get_accepted()); + CHK_EQ(ret3->get_result_code(), nuraft::cmd_result_code::OK) + + s1.raftServer->shutdown(); + s2.raftServer->shutdown(); + s3.raftServer->shutdown(); + TestSuite::sleep_sec(1, "shutting down"); + + SimpleLogger::shutdown(); + + return 0; +} + } // namespace asio_service_test; using namespace asio_service_test; @@ -1251,6 +1332,9 @@ int main(int argc, char** argv) { ts.doTest( "leadership transfer test", leadership_transfer_test ); + ts.doTest( "auto forwarding timeout test", + auto_forwarding_timeout_test ); + #ifdef ENABLE_RAFT_STATS _msg("raft stats: ENABLED\n"); #else diff --git a/tests/unit/fake_network.cxx b/tests/unit/fake_network.cxx index 5e7c9514..fb46e72a 100644 --- a/tests/unit/fake_network.cxx +++ b/tests/unit/fake_network.cxx @@ -298,7 +298,7 @@ FakeClient::FakeClient(FakeNetwork* mother, FakeClient::~FakeClient() {} -void FakeClient::send(ptr& req, rpc_handler& when_done) { +void FakeClient::send(ptr& req, rpc_handler& when_done, uint64_t /*send_timeout_ms*/) { SimpleLogger* ll = motherNet->getBase()->getLogger(); _log_info(ll, "got request %s -> %s, %s", motherNet->getEndpoint().c_str(), @@ -321,6 +321,10 @@ uint64_t FakeClient::get_id() const { return myId; } +bool FakeClient::is_abandoned() const { + return false; +} + // === FakeTimer diff --git a/tests/unit/fake_network.hxx b/tests/unit/fake_network.hxx index 020e014a..bbffaa83 100644 --- a/tests/unit/fake_network.hxx +++ b/tests/unit/fake_network.hxx @@ -137,7 +137,7 @@ public: ~FakeClient(); - void send(ptr& req, rpc_handler& when_done); + void send(ptr& req, rpc_handler& when_done, uint64_t send_timeout_ms = 0); void dropPackets(); @@ -145,6 +145,8 @@ public: uint64_t get_id() const; + bool is_abandoned() const; + private: uint64_t myId; FakeNetwork* motherNet; diff --git a/tests/unit/raft_package_asio.hxx b/tests/unit/raft_package_asio.hxx index 712f3e4a..75457c28 100644 --- a/tests/unit/raft_package_asio.hxx +++ b/tests/unit/raft_package_asio.hxx @@ -80,7 +80,8 @@ public: } void initServer(bool enable_ssl = false, - bool use_global_asio = false) { + bool use_global_asio = false, + const raft_server::init_options & opt = raft_server::init_options()) { std::string log_file_name = "./srv" + std::to_string(myId) + ".log"; myLogWrapper = cs_new(log_file_name); myLog = myLogWrapper; @@ -126,7 +127,7 @@ public: params.with_client_req_timeout(10000); context* ctx( new context( sMgr, sm, listener, myLog, rpc_cli_factory, scheduler, params ) ); - raftServer = cs_new(ctx); + raftServer = cs_new(ctx, opt); // Listen. asioListener = listener;