diff --git a/src/downloader/downloader_impl.rs b/src/downloader/downloader_impl.rs index 78cb3f8c..3339396b 100644 --- a/src/downloader/downloader_impl.rs +++ b/src/downloader/downloader_impl.rs @@ -1,16 +1,19 @@ -use crate::downloader::{ - chain_config::{ChainConfig, ChainsConfig}, - headers::{ - fetch_receive_stage::FetchReceiveStage, fetch_request_stage::FetchRequestStage, - header_slices::HeaderSlices, preverified_hashes_config::PreverifiedHashesConfig, - refill_stage::RefillStage, retry_stage::RetryStage, save_stage::SaveStage, - verify_stage::VerifyStage, +use crate::{ + downloader::{ + chain_config::{ChainConfig, ChainsConfig}, + headers::{ + fetch_receive_stage::FetchReceiveStage, fetch_request_stage::FetchRequestStage, + header_slices, header_slices::HeaderSlices, + preverified_hashes_config::PreverifiedHashesConfig, refill_stage::RefillStage, + retry_stage::RetryStage, save_stage::SaveStage, verify_stage::VerifyStage, + }, + opts::Opts, + sentry_client, + sentry_client::SentryClient, + sentry_client_impl::SentryClientImpl, + sentry_client_reactor::SentryClientReactor, }, - opts::Opts, - sentry_client, - sentry_client::SentryClient, - sentry_client_impl::SentryClientImpl, - sentry_client_reactor::SentryClientReactor, + models::BlockNumber, }; use futures_core::Stream; use parking_lot::RwLock; @@ -56,7 +59,17 @@ impl Downloader { let mut ui_system = crate::downloader::ui_system::UISystem::new(); ui_system.start(); - let header_slices = Arc::new(HeaderSlices::new(50 << 20 /* 50 Mb */)); + let preverified_hashes_config = PreverifiedHashesConfig::new(&self.opts.chain_name)?; + + let header_slices_mem_limit = 50 << 20; /* 50 Mb */ + let header_slices_final_block_num = BlockNumber( + ((preverified_hashes_config.hashes.len() - 1) * header_slices::HEADER_SLICE_SIZE) + as u64, + ); + let header_slices = Arc::new(HeaderSlices::new( + header_slices_mem_limit, + header_slices_final_block_num, + )); let sentry = Arc::new(RwLock::new(sentry_reactor)); let header_slices_view = @@ -80,10 +93,7 @@ impl Downloader { let retry_stage = RetryStage::new(Arc::clone(&header_slices)); - let verify_stage = VerifyStage::new( - Arc::clone(&header_slices), - PreverifiedHashesConfig::new(&self.opts.chain_name)?, - ); + let verify_stage = VerifyStage::new(Arc::clone(&header_slices), preverified_hashes_config); let save_stage = SaveStage::new(Arc::clone(&header_slices)); @@ -137,6 +147,9 @@ impl Downloader { if !fetch_receive_stage_ref.can_proceed() { break; } + if header_slices.is_empty_at_final_position() { + break; + } header_slices.notify_status_watchers(); } diff --git a/src/downloader/headers/header_slices.rs b/src/downloader/headers/header_slices.rs index 876e2679..17d6947d 100644 --- a/src/downloader/headers/header_slices.rs +++ b/src/downloader/headers/header_slices.rs @@ -47,6 +47,7 @@ pub struct HeaderSlices { slices: RwLock>>, max_slices: usize, max_block_num: AtomicU64, + final_block_num: BlockNumber, state_watches: HashMap, } @@ -55,9 +56,17 @@ pub const HEADER_SLICE_SIZE: usize = 192; const ATOMIC_ORDERING: Ordering = Ordering::SeqCst; impl HeaderSlices { - pub fn new(mem_limit: usize) -> Self { + pub fn new(mem_limit: usize, final_block_num: BlockNumber) -> Self { let max_slices = mem_limit / std::mem::size_of::
() / HEADER_SLICE_SIZE; + assert_eq!( + (final_block_num.0 as usize) % HEADER_SLICE_SIZE, + 0, + "final_block_num must be at the slice boundary" + ); + let max_slices = + std::cmp::min(max_slices, (final_block_num.0 as usize) / HEADER_SLICE_SIZE); + let mut slices = LinkedList::new(); for i in 0..max_slices { let slice = HeaderSlice { @@ -92,6 +101,7 @@ impl HeaderSlices { slices: RwLock::new(slices), max_slices, max_block_num: AtomicU64::new((max_slices * HEADER_SLICE_SIZE) as u64), + final_block_num, state_watches, } } @@ -150,10 +160,16 @@ impl HeaderSlices { pub fn refill(&self) { let mut slices = self.slices.write(); let initial_len = slices.len(); + let mut count = 0; for _ in initial_len..self.max_slices { + let max_block_num = self.max_block_num(); + if max_block_num >= self.final_block_num { + break; + } + let slice = HeaderSlice { - start_block_num: BlockNumber(self.max_block_num.load(ATOMIC_ORDERING)), + start_block_num: max_block_num, status: HeaderSliceStatus::Empty, headers: None, request_time: None, @@ -162,9 +178,9 @@ impl HeaderSlices { slices.push_back(RwLock::new(slice)); self.max_block_num .fetch_add(HEADER_SLICE_SIZE as u64, ATOMIC_ORDERING); + count += 1; } - let count = self.max_slices - initial_len; let status_watch = &self.state_watches[&HeaderSliceStatus::Empty]; status_watch.count.fetch_add(count, ATOMIC_ORDERING); } @@ -226,4 +242,12 @@ impl HeaderSlices { pub fn max_block_num(&self) -> BlockNumber { BlockNumber(self.max_block_num.load(ATOMIC_ORDERING)) } + + pub fn final_block_num(&self) -> BlockNumber { + self.final_block_num + } + + pub fn is_empty_at_final_position(&self) -> bool { + (self.max_block_num() >= self.final_block_num) && self.slices.read().is_empty() + } } diff --git a/src/downloader/headers/ui_crossterm.rs b/src/downloader/headers/ui_crossterm.rs index bb5ca7f2..1ad1173c 100644 --- a/src/downloader/headers/ui_crossterm.rs +++ b/src/downloader/headers/ui_crossterm.rs @@ -24,6 +24,7 @@ impl UIView for HeaderSlicesView { let counters = self.header_slices.status_counters(); let min_block_num = self.header_slices.min_block_num(); let max_block_num = self.header_slices.max_block_num(); + let final_block_num = self.header_slices.final_block_num(); let mut stdout = stdout(); @@ -36,9 +37,10 @@ impl UIView for HeaderSlicesView { // overall progress let progress_desc = std::format!( - "downloading headers {} - {} ...", - min_block_num, - max_block_num + "downloading headers {} - {} of {} ...", + min_block_num.0, + max_block_num.0, + final_block_num.0, ); stdout.queue(style::Print(progress_desc))?; stdout.queue(terminal::Clear(terminal::ClearType::UntilNewLine))?; diff --git a/src/downloader/headers/verify_stage.rs b/src/downloader/headers/verify_stage.rs index f760307c..1dae1067 100644 --- a/src/downloader/headers/verify_stage.rs +++ b/src/downloader/headers/verify_stage.rs @@ -98,7 +98,7 @@ impl VerifyStage { let last = headers.last().unwrap(); let last_hash = last.hash(); let expected_last_hash = - self.preverified_hash(slice.start_block_num + headers.len() as u64 - 1); + self.preverified_hash(slice.start_block_num.0 + headers.len() as u64 - 1); if expected_last_hash.is_none() { return false; }