Skip to content

Commit

Permalink
improved sync, avoid duplicate downloads
Browse files Browse the repository at this point in the history
  • Loading branch information
madMAx43v3r committed Oct 28, 2022
1 parent 474b12c commit 355d01e
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 21 deletions.
3 changes: 3 additions & 0 deletions include/mmx/Router.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,15 @@ class Router : public RouterBase {
struct sync_job_t {
bool is_done = false;
uint32_t height = 0;
uint32_t num_fetch = 0;
int64_t start_time_ms = 0;
int64_t last_recv_ms = 0;
std::unordered_set<uint64_t> failed;
std::unordered_set<uint64_t> pending;
std::unordered_set<uint64_t> succeeded;
std::unordered_map<uint64_t, hash_t> got_hash;
std::unordered_map<uint32_t, uint64_t> request_map; // [request id, client]
std::unordered_map<hash_t, int64_t> pending_blocks; // [hash, timeout]
std::unordered_map<hash_t, std::shared_ptr<const Block>> blocks;
};

Expand Down
100 changes: 79 additions & 21 deletions src/Router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,10 @@ void Router::update()
job->height = height;
}
job->start_time_ms = now_ms;
log(WARN) << "Timeout on sync job for height " << job->height << ", trying again ...";
log(WARN) << "Timeout on sync job for height " << job->height << ": "
<< job->got_hash.size() << " reply, " << job->num_fetch << " fetch, "
<< job->failed.size() << " failed, " << job->pending.size() << " pending, "
<< job->succeeded.size() << " succeeded";
}
}
}
Expand All @@ -519,7 +522,6 @@ bool Router::process(std::shared_ptr<const Return> ret)
bool did_consume = false;
for(auto& entry : sync_jobs)
{
// TODO: re-design to avoid duplicate downloads
const auto& request_id = entry.first;
auto& job = entry.second;
const auto elapsed_ms = now_ms - job->start_time_ms;
Expand All @@ -528,18 +530,42 @@ bool Router::process(std::shared_ptr<const Return> ret)
auto iter = job->request_map.find(ret->id);
if(iter != job->request_map.end()) {
const auto client = iter->second;
if(auto result = std::dynamic_pointer_cast<const Node_get_block_at_return>(ret->result)) {
if(auto result = std::dynamic_pointer_cast<const Node_get_block_hash_return>(ret->result)) {
if(auto hash = result->_ret_0) {
if(job->blocks.count(*hash)) {
job->succeeded.insert(client);
}
job->got_hash[client] = *hash;
} else {
job->failed.insert(client);
}
}
else if(auto result = std::dynamic_pointer_cast<const Node_get_block_return>(ret->result)) {
if(auto block = result->_ret_0) {
const auto& hash = block->hash;
if(block->is_valid()) {
job->blocks[block->content_hash] = block;
for(const auto& entry : job->got_hash) {
if(entry.second == hash) {
job->succeeded.insert(entry.first);
}
}
job->succeeded.insert(client);
job->blocks[hash] = block;
} else {
ban_peer(client, "they sent us an invalid block");
}
} else {
job->pending_blocks.erase(hash);
}
if(!job->succeeded.count(client)) {
job->failed.insert(client);
}
}
else if(auto result = std::dynamic_pointer_cast<const vnx::Exception>(ret->result)) {
auto got_hash = job->got_hash.find(client);
if(got_hash != job->got_hash.end()) {
job->pending_blocks.erase(got_hash->second);
}
}
job->pending.erase(client);
job->request_map.erase(iter);
job->last_recv_ms = now_ms;
Expand All @@ -550,42 +576,74 @@ bool Router::process(std::shared_ptr<const Return> ret)
}
// check for disconnects
for(auto iter = job->pending.begin(); iter != job->pending.end();) {
if(synced_peers.count(*iter)) {
const auto client = *iter;
if(synced_peers.count(client)) {
iter++;
} else {
auto iter2 = job->got_hash.find(client);
if(iter2 != job->got_hash.end()) {
job->pending_blocks.erase(iter2->second);
job->got_hash.erase(iter2);
}
iter = job->pending.erase(iter);
}
}
const auto num_returns = job->failed.size() + job->succeeded.size();
if(num_returns < min_sync_peers) {
const auto num_left = (min_sync_peers + 1 + (elapsed_ms / 5000)) - num_returns;
if(job->pending.size() < num_left) {
auto clients = synced_peers;
for(auto id : job->failed) {
clients.erase(id);
}
for(auto id : job->succeeded) {
clients.erase(id);
// fetch block hashes
const auto max_pending = min_sync_peers + 3;
const auto num_pending = job->pending.size() + job->got_hash.size();
if(num_pending < max_pending) {
std::set<uint64_t> clients;
for(auto client : synced_peers) {
if(!job->failed.count(client) && !job->pending.count(client)
&& !job->succeeded.count(client) && !job->got_hash.count(client))
{
clients.insert(client);
}
}
// TODO: prefer non-blocked peers
for(auto client : get_subset(clients, num_left, rand_engine))
{
auto req = Node_get_block_at::create();
for(const auto client : get_subset(clients, max_pending - num_pending, rand_engine)) {
// TODO: Node_get_block_hash_ex
auto req = Node_get_block_hash::create();
req->height = job->height;
const auto id = send_request(client, req);
job->request_map[id] = client;
job->pending.insert(client);
}
}
// fetch blocks
std::set<std::pair<uint64_t, hash_t>> clients;
for(const auto& entry : job->got_hash) {
const auto& hash = entry.second;
if(!job->blocks.count(hash)) {
const auto client = entry.first;
if(!job->failed.count(client) && !job->pending.count(client) && !job->succeeded.count(client)) {
clients.emplace(client, hash);
}
}
}
for(const auto& entry : get_subset(clients, clients.size(), rand_engine)) {
const auto client = entry.first;
const auto& hash = entry.second;
auto pending = job->pending_blocks.find(hash);
if(pending == job->pending_blocks.end() || now_ms > pending->second) {
auto req = Node_get_block::create();
req->hash = hash;
const auto id = send_request(client, req);
job->request_map[id] = client;
job->pending.insert(client);
job->pending_blocks[hash] = now_ms + fetch_timeout_ms / 8;
job->num_fetch++;
}
}
} else {
uint32_t max_block_size = 0;
for(const auto& entry : job->blocks) {
max_block_size = std::max(entry.second->tx_cost, max_block_size);
}
log(DEBUG) << "Got " << job->blocks.size() << " blocks for height " << job->height << " by fetching "
<< job->succeeded.size() + job->failed.size() << " times, " << job->failed.size() << " failed"
<< ", size = " << to_value(max_block_size, params) << " MMX"
<< ", took " << elapsed_ms / 1e3 << " sec";
<< job->num_fetch << " times, " << job->got_hash.size() << " reply, " << job->failed.size() << " failed"
<< ", size = " << to_value(max_block_size, params) << " MMX" << ", took " << elapsed_ms / 1e3 << " sec";
// we are done with the job
std::vector<std::shared_ptr<const Block>> blocks;
for(const auto& entry : job->blocks) {
Expand Down

0 comments on commit 355d01e

Please sign in to comment.