Skip to content
This repository has been archived by the owner on Dec 2, 2022. It is now read-only.

RefillStage: stop refilling when everything is loaded #29

Merged
merged 1 commit into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 30 additions & 17 deletions src/downloader/downloader_impl.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
use crate::downloader::{
chain_config::{ChainConfig, ChainsConfig},
headers::{
fetch_receive_stage::FetchReceiveStage, fetch_request_stage::FetchRequestStage,
header_slices::HeaderSlices, preverified_hashes_config::PreverifiedHashesConfig,
refill_stage::RefillStage, retry_stage::RetryStage, save_stage::SaveStage,
verify_stage::VerifyStage,
use crate::{
downloader::{
chain_config::{ChainConfig, ChainsConfig},
headers::{
fetch_receive_stage::FetchReceiveStage, fetch_request_stage::FetchRequestStage,
header_slices, header_slices::HeaderSlices,
preverified_hashes_config::PreverifiedHashesConfig, refill_stage::RefillStage,
retry_stage::RetryStage, save_stage::SaveStage, verify_stage::VerifyStage,
},
opts::Opts,
sentry_client,
sentry_client::SentryClient,
sentry_client_impl::SentryClientImpl,
sentry_client_reactor::SentryClientReactor,
},
opts::Opts,
sentry_client,
sentry_client::SentryClient,
sentry_client_impl::SentryClientImpl,
sentry_client_reactor::SentryClientReactor,
models::BlockNumber,
};
use futures_core::Stream;
use parking_lot::RwLock;
Expand Down Expand Up @@ -56,7 +59,17 @@ impl Downloader {
let mut ui_system = crate::downloader::ui_system::UISystem::new();
ui_system.start();

let header_slices = Arc::new(HeaderSlices::new(50 << 20 /* 50 Mb */));
let preverified_hashes_config = PreverifiedHashesConfig::new(&self.opts.chain_name)?;

let header_slices_mem_limit = 50 << 20; /* 50 Mb */
let header_slices_final_block_num = BlockNumber(
((preverified_hashes_config.hashes.len() - 1) * header_slices::HEADER_SLICE_SIZE)
as u64,
);
let header_slices = Arc::new(HeaderSlices::new(
header_slices_mem_limit,
header_slices_final_block_num,
));
let sentry = Arc::new(RwLock::new(sentry_reactor));

let header_slices_view =
Expand All @@ -80,10 +93,7 @@ impl Downloader {

let retry_stage = RetryStage::new(Arc::clone(&header_slices));

let verify_stage = VerifyStage::new(
Arc::clone(&header_slices),
PreverifiedHashesConfig::new(&self.opts.chain_name)?,
);
let verify_stage = VerifyStage::new(Arc::clone(&header_slices), preverified_hashes_config);

let save_stage = SaveStage::new(Arc::clone(&header_slices));

Expand Down Expand Up @@ -137,6 +147,9 @@ impl Downloader {
if !fetch_receive_stage_ref.can_proceed() {
break;
}
if header_slices.is_empty_at_final_position() {
break;
}

header_slices.notify_status_watchers();
}
Expand Down
30 changes: 27 additions & 3 deletions src/downloader/headers/header_slices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub struct HeaderSlices {
slices: RwLock<LinkedList<RwLock<HeaderSlice>>>,
max_slices: usize,
max_block_num: AtomicU64,
final_block_num: BlockNumber,
state_watches: HashMap<HeaderSliceStatus, HeaderSliceStatusWatch>,
}

Expand All @@ -55,9 +56,17 @@ pub const HEADER_SLICE_SIZE: usize = 192;
const ATOMIC_ORDERING: Ordering = Ordering::SeqCst;

impl HeaderSlices {
pub fn new(mem_limit: usize) -> Self {
pub fn new(mem_limit: usize, final_block_num: BlockNumber) -> Self {
let max_slices = mem_limit / std::mem::size_of::<Header>() / HEADER_SLICE_SIZE;

assert_eq!(
(final_block_num.0 as usize) % HEADER_SLICE_SIZE,
0,
"final_block_num must be at the slice boundary"
);
let max_slices =
std::cmp::min(max_slices, (final_block_num.0 as usize) / HEADER_SLICE_SIZE);

let mut slices = LinkedList::new();
for i in 0..max_slices {
let slice = HeaderSlice {
Expand Down Expand Up @@ -92,6 +101,7 @@ impl HeaderSlices {
slices: RwLock::new(slices),
max_slices,
max_block_num: AtomicU64::new((max_slices * HEADER_SLICE_SIZE) as u64),
final_block_num,
state_watches,
}
}
Expand Down Expand Up @@ -150,10 +160,16 @@ impl HeaderSlices {
pub fn refill(&self) {
let mut slices = self.slices.write();
let initial_len = slices.len();
let mut count = 0;

for _ in initial_len..self.max_slices {
let max_block_num = self.max_block_num();
if max_block_num >= self.final_block_num {
break;
}

let slice = HeaderSlice {
start_block_num: BlockNumber(self.max_block_num.load(ATOMIC_ORDERING)),
start_block_num: max_block_num,
status: HeaderSliceStatus::Empty,
headers: None,
request_time: None,
Expand All @@ -162,9 +178,9 @@ impl HeaderSlices {
slices.push_back(RwLock::new(slice));
self.max_block_num
.fetch_add(HEADER_SLICE_SIZE as u64, ATOMIC_ORDERING);
count += 1;
}

let count = self.max_slices - initial_len;
let status_watch = &self.state_watches[&HeaderSliceStatus::Empty];
status_watch.count.fetch_add(count, ATOMIC_ORDERING);
}
Expand Down Expand Up @@ -226,4 +242,12 @@ impl HeaderSlices {
pub fn max_block_num(&self) -> BlockNumber {
BlockNumber(self.max_block_num.load(ATOMIC_ORDERING))
}

pub fn final_block_num(&self) -> BlockNumber {
self.final_block_num
}

pub fn is_empty_at_final_position(&self) -> bool {
(self.max_block_num() >= self.final_block_num) && self.slices.read().is_empty()
}
}
8 changes: 5 additions & 3 deletions src/downloader/headers/ui_crossterm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ impl UIView for HeaderSlicesView {
let counters = self.header_slices.status_counters();
let min_block_num = self.header_slices.min_block_num();
let max_block_num = self.header_slices.max_block_num();
let final_block_num = self.header_slices.final_block_num();

let mut stdout = stdout();

Expand All @@ -36,9 +37,10 @@ impl UIView for HeaderSlicesView {

// overall progress
let progress_desc = std::format!(
"downloading headers {} - {} ...",
min_block_num,
max_block_num
"downloading headers {} - {} of {} ...",
min_block_num.0,
max_block_num.0,
final_block_num.0,
);
stdout.queue(style::Print(progress_desc))?;
stdout.queue(terminal::Clear(terminal::ClearType::UntilNewLine))?;
Expand Down
2 changes: 1 addition & 1 deletion src/downloader/headers/verify_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl VerifyStage {
let last = headers.last().unwrap();
let last_hash = last.hash();
let expected_last_hash =
self.preverified_hash(slice.start_block_num + headers.len() as u64 - 1);
self.preverified_hash(slice.start_block_num.0 + headers.len() as u64 - 1);
if expected_last_hash.is_none() {
return false;
}
Expand Down