Skip to content
This repository has been archived by the owner on Dec 2, 2022. It is now read-only.

Commit

Permalink
RefillStage: stop refilling when everything is loaded.
Browse files Browse the repository at this point in the history
Preverified headers downloader process stops when it loads
all header slices until the max known preverified hash.

Since preverified hashes act as slices boundaries,
the number of slices to download is 1 less than the number of hashes.
  • Loading branch information
battlmonstr committed Sep 16, 2021
1 parent 91c2267 commit 0ff0a10
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 24 deletions.
47 changes: 30 additions & 17 deletions src/downloader/downloader_impl.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
use crate::downloader::{
chain_config::{ChainConfig, ChainsConfig},
headers::{
fetch_receive_stage::FetchReceiveStage, fetch_request_stage::FetchRequestStage,
header_slices::HeaderSlices, preverified_hashes_config::PreverifiedHashesConfig,
refill_stage::RefillStage, retry_stage::RetryStage, save_stage::SaveStage,
verify_stage::VerifyStage,
use crate::{
downloader::{
chain_config::{ChainConfig, ChainsConfig},
headers::{
fetch_receive_stage::FetchReceiveStage, fetch_request_stage::FetchRequestStage,
header_slices, header_slices::HeaderSlices,
preverified_hashes_config::PreverifiedHashesConfig, refill_stage::RefillStage,
retry_stage::RetryStage, save_stage::SaveStage, verify_stage::VerifyStage,
},
opts::Opts,
sentry_client,
sentry_client::SentryClient,
sentry_client_impl::SentryClientImpl,
sentry_client_reactor::SentryClientReactor,
},
opts::Opts,
sentry_client,
sentry_client::SentryClient,
sentry_client_impl::SentryClientImpl,
sentry_client_reactor::SentryClientReactor,
models::BlockNumber,
};
use futures_core::Stream;
use parking_lot::RwLock;
Expand Down Expand Up @@ -56,7 +59,17 @@ impl Downloader {
let mut ui_system = crate::downloader::ui_system::UISystem::new();
ui_system.start();

let header_slices = Arc::new(HeaderSlices::new(50 << 20 /* 50 Mb */));
let preverified_hashes_config = PreverifiedHashesConfig::new(&self.opts.chain_name)?;

let header_slices_mem_limit = 50 << 20; /* 50 Mb */
let header_slices_final_block_num = BlockNumber(
((preverified_hashes_config.hashes.len() - 1) * header_slices::HEADER_SLICE_SIZE)
as u64,
);
let header_slices = Arc::new(HeaderSlices::new(
header_slices_mem_limit,
header_slices_final_block_num,
));
let sentry = Arc::new(RwLock::new(sentry_reactor));

let header_slices_view =
Expand All @@ -80,10 +93,7 @@ impl Downloader {

let retry_stage = RetryStage::new(Arc::clone(&header_slices));

let verify_stage = VerifyStage::new(
Arc::clone(&header_slices),
PreverifiedHashesConfig::new(&self.opts.chain_name)?,
);
let verify_stage = VerifyStage::new(Arc::clone(&header_slices), preverified_hashes_config);

let save_stage = SaveStage::new(Arc::clone(&header_slices));

Expand Down Expand Up @@ -137,6 +147,9 @@ impl Downloader {
if !fetch_receive_stage_ref.can_proceed() {
break;
}
if header_slices.is_empty_at_final_position() {
break;
}

header_slices.notify_status_watchers();
}
Expand Down
30 changes: 27 additions & 3 deletions src/downloader/headers/header_slices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub struct HeaderSlices {
slices: RwLock<LinkedList<RwLock<HeaderSlice>>>,
max_slices: usize,
max_block_num: AtomicU64,
final_block_num: BlockNumber,
state_watches: HashMap<HeaderSliceStatus, HeaderSliceStatusWatch>,
}

Expand All @@ -55,9 +56,17 @@ pub const HEADER_SLICE_SIZE: usize = 192;
const ATOMIC_ORDERING: Ordering = Ordering::SeqCst;

impl HeaderSlices {
pub fn new(mem_limit: usize) -> Self {
pub fn new(mem_limit: usize, final_block_num: BlockNumber) -> Self {
let max_slices = mem_limit / std::mem::size_of::<Header>() / HEADER_SLICE_SIZE;

assert_eq!(
(final_block_num.0 as usize) % HEADER_SLICE_SIZE,
0,
"final_block_num must be at the slice boundary"
);
let max_slices =
std::cmp::min(max_slices, (final_block_num.0 as usize) / HEADER_SLICE_SIZE);

let mut slices = LinkedList::new();
for i in 0..max_slices {
let slice = HeaderSlice {
Expand Down Expand Up @@ -92,6 +101,7 @@ impl HeaderSlices {
slices: RwLock::new(slices),
max_slices,
max_block_num: AtomicU64::new((max_slices * HEADER_SLICE_SIZE) as u64),
final_block_num,
state_watches,
}
}
Expand Down Expand Up @@ -150,10 +160,16 @@ impl HeaderSlices {
pub fn refill(&self) {
let mut slices = self.slices.write();
let initial_len = slices.len();
let mut count = 0;

for _ in initial_len..self.max_slices {
let max_block_num = self.max_block_num();
if max_block_num >= self.final_block_num {
break;
}

let slice = HeaderSlice {
start_block_num: BlockNumber(self.max_block_num.load(ATOMIC_ORDERING)),
start_block_num: max_block_num,
status: HeaderSliceStatus::Empty,
headers: None,
request_time: None,
Expand All @@ -162,9 +178,9 @@ impl HeaderSlices {
slices.push_back(RwLock::new(slice));
self.max_block_num
.fetch_add(HEADER_SLICE_SIZE as u64, ATOMIC_ORDERING);
count += 1;
}

let count = self.max_slices - initial_len;
let status_watch = &self.state_watches[&HeaderSliceStatus::Empty];
status_watch.count.fetch_add(count, ATOMIC_ORDERING);
}
Expand Down Expand Up @@ -226,4 +242,12 @@ impl HeaderSlices {
pub fn max_block_num(&self) -> BlockNumber {
BlockNumber(self.max_block_num.load(ATOMIC_ORDERING))
}

pub fn final_block_num(&self) -> BlockNumber {
self.final_block_num
}

pub fn is_empty_at_final_position(&self) -> bool {
(self.max_block_num() >= self.final_block_num) && self.slices.read().is_empty()
}
}
8 changes: 5 additions & 3 deletions src/downloader/headers/ui_crossterm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ impl UIView for HeaderSlicesView {
let counters = self.header_slices.status_counters();
let min_block_num = self.header_slices.min_block_num();
let max_block_num = self.header_slices.max_block_num();
let final_block_num = self.header_slices.final_block_num();

let mut stdout = stdout();

Expand All @@ -36,9 +37,10 @@ impl UIView for HeaderSlicesView {

// overall progress
let progress_desc = std::format!(
"downloading headers {} - {} ...",
min_block_num,
max_block_num
"downloading headers {} - {} of {} ...",
min_block_num.0,
max_block_num.0,
final_block_num.0,
);
stdout.queue(style::Print(progress_desc))?;
stdout.queue(terminal::Clear(terminal::ClearType::UntilNewLine))?;
Expand Down
2 changes: 1 addition & 1 deletion src/downloader/headers/verify_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl VerifyStage {
let last = headers.last().unwrap();
let last_hash = last.hash();
let expected_last_hash =
self.preverified_hash(slice.start_block_num + headers.len() as u64 - 1);
self.preverified_hash(slice.start_block_num.0 + headers.len() as u64 - 1);
if expected_last_hash.is_none() {
return false;
}
Expand Down

0 comments on commit 0ff0a10

Please sign in to comment.