From 0ff0a106c1d4a11912ca266b610582614be269ff Mon Sep 17 00:00:00 2001 From: battlmonstr Date: Sun, 29 Aug 2021 19:13:28 +0200 Subject: [PATCH] RefillStage: stop refilling when everything is loaded. Preverified headers downloader process stops when it loads all header slices until the max known preverified hash. Since preverified hashes act as slices boundaries, the number of slices to download is 1 less than the number of hashes. --- src/downloader/downloader_impl.rs | 47 ++++++++++++++++--------- src/downloader/headers/header_slices.rs | 30 ++++++++++++++-- src/downloader/headers/ui_crossterm.rs | 8 +++-- src/downloader/headers/verify_stage.rs | 2 +- 4 files changed, 63 insertions(+), 24 deletions(-) diff --git a/src/downloader/downloader_impl.rs b/src/downloader/downloader_impl.rs index 78cb3f8c..3339396b 100644 --- a/src/downloader/downloader_impl.rs +++ b/src/downloader/downloader_impl.rs @@ -1,16 +1,19 @@ -use crate::downloader::{ - chain_config::{ChainConfig, ChainsConfig}, - headers::{ - fetch_receive_stage::FetchReceiveStage, fetch_request_stage::FetchRequestStage, - header_slices::HeaderSlices, preverified_hashes_config::PreverifiedHashesConfig, - refill_stage::RefillStage, retry_stage::RetryStage, save_stage::SaveStage, - verify_stage::VerifyStage, +use crate::{ + downloader::{ + chain_config::{ChainConfig, ChainsConfig}, + headers::{ + fetch_receive_stage::FetchReceiveStage, fetch_request_stage::FetchRequestStage, + header_slices, header_slices::HeaderSlices, + preverified_hashes_config::PreverifiedHashesConfig, refill_stage::RefillStage, + retry_stage::RetryStage, save_stage::SaveStage, verify_stage::VerifyStage, + }, + opts::Opts, + sentry_client, + sentry_client::SentryClient, + sentry_client_impl::SentryClientImpl, + sentry_client_reactor::SentryClientReactor, }, - opts::Opts, - sentry_client, - sentry_client::SentryClient, - sentry_client_impl::SentryClientImpl, - sentry_client_reactor::SentryClientReactor, + models::BlockNumber, }; use futures_core::Stream; use parking_lot::RwLock; @@ -56,7 +59,17 @@ impl Downloader { let mut ui_system = crate::downloader::ui_system::UISystem::new(); ui_system.start(); - let header_slices = Arc::new(HeaderSlices::new(50 << 20 /* 50 Mb */)); + let preverified_hashes_config = PreverifiedHashesConfig::new(&self.opts.chain_name)?; + + let header_slices_mem_limit = 50 << 20; /* 50 Mb */ + let header_slices_final_block_num = BlockNumber( + ((preverified_hashes_config.hashes.len() - 1) * header_slices::HEADER_SLICE_SIZE) + as u64, + ); + let header_slices = Arc::new(HeaderSlices::new( + header_slices_mem_limit, + header_slices_final_block_num, + )); let sentry = Arc::new(RwLock::new(sentry_reactor)); let header_slices_view = @@ -80,10 +93,7 @@ impl Downloader { let retry_stage = RetryStage::new(Arc::clone(&header_slices)); - let verify_stage = VerifyStage::new( - Arc::clone(&header_slices), - PreverifiedHashesConfig::new(&self.opts.chain_name)?, - ); + let verify_stage = VerifyStage::new(Arc::clone(&header_slices), preverified_hashes_config); let save_stage = SaveStage::new(Arc::clone(&header_slices)); @@ -137,6 +147,9 @@ impl Downloader { if !fetch_receive_stage_ref.can_proceed() { break; } + if header_slices.is_empty_at_final_position() { + break; + } header_slices.notify_status_watchers(); } diff --git a/src/downloader/headers/header_slices.rs b/src/downloader/headers/header_slices.rs index 876e2679..17d6947d 100644 --- a/src/downloader/headers/header_slices.rs +++ b/src/downloader/headers/header_slices.rs @@ -47,6 +47,7 @@ pub struct HeaderSlices { slices: RwLock>>, max_slices: usize, max_block_num: AtomicU64, + final_block_num: BlockNumber, state_watches: HashMap, } @@ -55,9 +56,17 @@ pub const HEADER_SLICE_SIZE: usize = 192; const ATOMIC_ORDERING: Ordering = Ordering::SeqCst; impl HeaderSlices { - pub fn new(mem_limit: usize) -> Self { + pub fn new(mem_limit: usize, final_block_num: BlockNumber) -> Self { let max_slices = mem_limit / std::mem::size_of::
() / HEADER_SLICE_SIZE; + assert_eq!( + (final_block_num.0 as usize) % HEADER_SLICE_SIZE, + 0, + "final_block_num must be at the slice boundary" + ); + let max_slices = + std::cmp::min(max_slices, (final_block_num.0 as usize) / HEADER_SLICE_SIZE); + let mut slices = LinkedList::new(); for i in 0..max_slices { let slice = HeaderSlice { @@ -92,6 +101,7 @@ impl HeaderSlices { slices: RwLock::new(slices), max_slices, max_block_num: AtomicU64::new((max_slices * HEADER_SLICE_SIZE) as u64), + final_block_num, state_watches, } } @@ -150,10 +160,16 @@ impl HeaderSlices { pub fn refill(&self) { let mut slices = self.slices.write(); let initial_len = slices.len(); + let mut count = 0; for _ in initial_len..self.max_slices { + let max_block_num = self.max_block_num(); + if max_block_num >= self.final_block_num { + break; + } + let slice = HeaderSlice { - start_block_num: BlockNumber(self.max_block_num.load(ATOMIC_ORDERING)), + start_block_num: max_block_num, status: HeaderSliceStatus::Empty, headers: None, request_time: None, @@ -162,9 +178,9 @@ impl HeaderSlices { slices.push_back(RwLock::new(slice)); self.max_block_num .fetch_add(HEADER_SLICE_SIZE as u64, ATOMIC_ORDERING); + count += 1; } - let count = self.max_slices - initial_len; let status_watch = &self.state_watches[&HeaderSliceStatus::Empty]; status_watch.count.fetch_add(count, ATOMIC_ORDERING); } @@ -226,4 +242,12 @@ impl HeaderSlices { pub fn max_block_num(&self) -> BlockNumber { BlockNumber(self.max_block_num.load(ATOMIC_ORDERING)) } + + pub fn final_block_num(&self) -> BlockNumber { + self.final_block_num + } + + pub fn is_empty_at_final_position(&self) -> bool { + (self.max_block_num() >= self.final_block_num) && self.slices.read().is_empty() + } } diff --git a/src/downloader/headers/ui_crossterm.rs b/src/downloader/headers/ui_crossterm.rs index bb5ca7f2..1ad1173c 100644 --- a/src/downloader/headers/ui_crossterm.rs +++ b/src/downloader/headers/ui_crossterm.rs @@ -24,6 +24,7 @@ impl UIView for HeaderSlicesView { let counters = self.header_slices.status_counters(); let min_block_num = self.header_slices.min_block_num(); let max_block_num = self.header_slices.max_block_num(); + let final_block_num = self.header_slices.final_block_num(); let mut stdout = stdout(); @@ -36,9 +37,10 @@ impl UIView for HeaderSlicesView { // overall progress let progress_desc = std::format!( - "downloading headers {} - {} ...", - min_block_num, - max_block_num + "downloading headers {} - {} of {} ...", + min_block_num.0, + max_block_num.0, + final_block_num.0, ); stdout.queue(style::Print(progress_desc))?; stdout.queue(terminal::Clear(terminal::ClearType::UntilNewLine))?; diff --git a/src/downloader/headers/verify_stage.rs b/src/downloader/headers/verify_stage.rs index f760307c..1dae1067 100644 --- a/src/downloader/headers/verify_stage.rs +++ b/src/downloader/headers/verify_stage.rs @@ -98,7 +98,7 @@ impl VerifyStage { let last = headers.last().unwrap(); let last_hash = last.hash(); let expected_last_hash = - self.preverified_hash(slice.start_block_num + headers.len() as u64 - 1); + self.preverified_hash(slice.start_block_num.0 + headers.len() as u64 - 1); if expected_last_hash.is_none() { return false; }