Skip to content

Commit

Permalink
Generalize SnapshotService implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ordian committed May 31, 2018
1 parent da7c0f6 commit 402b5ac
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 53 deletions.
18 changes: 12 additions & 6 deletions ethcore/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ use stop_guard::StopGuard;
use sync::PrivateTxHandler;
use ethcore::client::{Client, ClientConfig, ChainNotify, ClientIoMessage};
use ethcore::miner::Miner;
use ethcore::snapshot::service::{Service as SnapshotService, ServiceParams as SnapServiceParams};
use ethcore::snapshot::service::{
Service as SnapshotService,
ServiceParams as SnapServiceParams,
FullNodeRestorationParams
};
use ethcore::snapshot::{SnapshotService as _SnapshotService, RestorationStatus};
use ethcore::spec::Spec;
use ethcore::account_provider::AccountProvider;
Expand Down Expand Up @@ -67,7 +71,7 @@ impl PrivateTxHandler for PrivateTxService {
pub struct ClientService {
io_service: Arc<IoService<ClientIoMessage>>,
client: Arc<Client>,
snapshot: Arc<SnapshotService>,
snapshot: Arc<SnapshotService<FullNodeRestorationParams>>,
private_tx: Arc<PrivateTxService>,
database: Arc<KeyValueDB>,
_stop_guard: StopGuard,
Expand Down Expand Up @@ -97,9 +101,11 @@ impl ClientService {

let snapshot_params = SnapServiceParams {
engine: spec.engine.clone(),
genesis_block: spec.genesis_block(),
restoration_db_handler: restoration_db_handler,
pruning: pruning,
chain_params: FullNodeRestorationParams {
pruning,
genesis_block: spec.genesis_block(),
},
channel: io_service.channel(),
snapshot_root: snapshot_path.into(),
db_restore: client.clone(),
Expand Down Expand Up @@ -147,7 +153,7 @@ impl ClientService {
}

/// Get snapshot interface.
pub fn snapshot_service(&self) -> Arc<SnapshotService> {
pub fn snapshot_service(&self) -> Arc<SnapshotService<FullNodeRestorationParams>> {
self.snapshot.clone()
}

Expand Down Expand Up @@ -178,7 +184,7 @@ impl ClientService {
/// IO interface for the Client handler
struct ClientIoHandler {
client: Arc<Client>,
snapshot: Arc<SnapshotService>,
snapshot: Arc<SnapshotService<FullNodeRestorationParams>>,
}

const CLIENT_TICK_TIMER: TimerToken = 0;
Expand Down
4 changes: 2 additions & 2 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1279,9 +1279,9 @@ impl Client {
}
}

impl snapshot::DatabaseRestore for Client {
impl snapshot::DatabaseRestore<snapshot::FullNodeRestorationParams> for Client {
/// Restart the client with a new backend
fn restore_db(&self, new_db: &str) -> Result<(), EthcoreError> {
fn restore_db(&self, new_db: &str, _: &snapshot::FullNodeRestorationParams) -> Result<(), EthcoreError> {
trace!(target: "snapshot", "Replacing client database with {:?}", new_db);

let _import_lock = self.importer.import_lock.lock();
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use rand::{Rng, OsRng};
pub use self::error::Error;

pub use self::consensus::*;
pub use self::service::{Service, DatabaseRestore};
pub use self::service::{Service, DatabaseRestore, FullNodeRestorationParams};
pub use self::traits::SnapshotService;
pub use self::watcher::Watcher;
pub use types::snapshot_manifest::ManifestData;
Expand Down
139 changes: 97 additions & 42 deletions ethcore/src/snapshot/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,18 @@ impl Drop for Guard {
}

/// External database restoration handler
pub trait DatabaseRestore: Send + Sync {
pub trait DatabaseRestore<T: ChainRestorationParams>: Send + Sync {
/// Restart with a new backend. Takes ownership of passed database and moves it to a new location.
fn restore_db(&self, new_db: &str) -> Result<(), Error>;
fn restore_db(&self, new_db: &str, params: &T) -> Result<(), Error>;
}

/// State restoration manager.
struct Restoration {
pub struct Restoration {
manifest: ManifestData,
state_chunks_left: HashSet<H256>,
block_chunks_left: HashSet<H256>,
state: StateRebuilder,
// TODO: maybe a more type-safe API?
state: Option<StateRebuilder>,
secondary: Box<Rebuilder>,
writer: Option<LooseWriter>,
snappy_buffer: Bytes,
Expand Down Expand Up @@ -118,7 +119,7 @@ impl Restoration {
manifest: manifest,
state_chunks_left: state_chunks,
block_chunks_left: block_chunks,
state: StateRebuilder::new(raw_db.clone(), params.pruning),
state: Some(StateRebuilder::new(raw_db.clone(), params.pruning)),
secondary: secondary,
writer: params.writer,
snappy_buffer: Vec::new(),
Expand All @@ -130,6 +131,10 @@ impl Restoration {

// feeds a state chunk, aborts early if `flag` becomes false.
fn feed_state(&mut self, hash: H256, chunk: &[u8], flag: &AtomicBool) -> Result<(), Error> {
if self.state.is_none() {
trace!(target: "snapshot", "Feeding a chunk state to a light client");
return Ok(());
}
if self.state_chunks_left.contains(&hash) {
let expected_len = snappy::decompressed_len(chunk)?;
if expected_len > MAX_CHUNK_SIZE {
Expand All @@ -138,7 +143,10 @@ impl Restoration {
}
let len = snappy::decompress_into(chunk, &mut self.snappy_buffer)?;

self.state.feed(&self.snappy_buffer[..len], flag)?;
self.state
.as_mut()
.expect("checked above; qed")
.feed(&self.snappy_buffer[..len], flag)?;

if let Some(ref mut writer) = self.writer.as_mut() {
writer.write_state_chunk(hash, chunk)?;
Expand Down Expand Up @@ -177,15 +185,17 @@ impl Restoration {

if !self.is_done() { return Ok(()) }

// verify final state root.
let root = self.state.state_root();
if root != self.final_state_root {
warn!("Final restored state has wrong state root: expected {:?}, got {:?}", self.final_state_root, root);
bail!(TrieError::InvalidStateRoot(root));
}
if let Some(state) = self.state {
// verify final state root.
let root = state.state_root();
if root != self.final_state_root {
warn!("Final restored state has wrong state root: expected {:?}, got {:?}", self.final_state_root, root);
bail!(TrieError::InvalidStateRoot(root));
}

// check for missing code.
self.state.finalize(self.manifest.block_number, self.manifest.block_hash)?;
// check for missing code.
state.finalize(self.manifest.block_number, self.manifest.block_hash)?;
}

// connect out-of-order chunks and verify chain integrity.
self.secondary.finalize(engine)?;
Expand All @@ -208,13 +218,11 @@ impl Restoration {
pub type Channel = IoChannel<ClientIoMessage>;

/// Snapshot service parameters.
pub struct ServiceParams {
pub struct ServiceParams<T: ChainRestorationParams> {
/// The consensus engine this is built on.
pub engine: Arc<EthEngine>,
/// The chain's genesis block.
pub genesis_block: Bytes,
/// State pruning algorithm.
pub pruning: Algorithm,
/// Additional chain specific restoration params.
pub chain_params: T,
/// Handler for opening a restoration DB.
pub restoration_db_handler: Box<KeyValueDBHandler>,
/// Async IO channel for sending messages.
Expand All @@ -223,42 +231,88 @@ pub struct ServiceParams {
/// Usually "<chain hash>/snapshot"
pub snapshot_root: PathBuf,
/// A handle for database restoration.
pub db_restore: Arc<DatabaseRestore>,
pub db_restore: Arc<DatabaseRestore<T>>,
}

/// Full node specific restoration parameters.
pub struct FullNodeRestorationParams {
/// The chain's genesis block.
pub genesis_block: Bytes,
/// State pruning algorithm.
pub pruning: Algorithm,
}

/// TODO: document
pub trait ChainRestorationParams: Send + Sync {
/// TODO: document
fn restoration(
&self,
manifest: ManifestData,
rest_db: PathBuf,
restoration_db_handler: &KeyValueDBHandler,
writer: Option<LooseWriter>,
engine: &EthEngine,
) -> Result<Restoration, Error>;

/// TODO: document
fn is_light(&self) -> bool {
false
}
}

impl ChainRestorationParams for FullNodeRestorationParams {
fn restoration(
&self,
manifest: ManifestData,
rest_db: PathBuf,
restoration_db_handler: &KeyValueDBHandler,
writer: Option<LooseWriter>,
engine: &EthEngine,
) -> Result<Restoration, Error> {
let params = RestorationParams {
manifest: manifest.clone(),
pruning: self.pruning,
db: restoration_db_handler.open(&rest_db)?,
writer: writer,
genesis: &self.genesis_block,
guard: Guard::new(rest_db),
engine: engine,
};
Restoration::new(params)
}
}

/// `SnapshotService` implementation.
/// This controls taking snapshots and restoring from them.
pub struct Service {
pub struct Service<T: ChainRestorationParams> {
restoration: Mutex<Option<Restoration>>,
restoration_db_handler: Box<KeyValueDBHandler>,
snapshot_root: PathBuf,
io_channel: Mutex<Channel>,
pruning: Algorithm,
status: Mutex<RestorationStatus>,
reader: RwLock<Option<LooseReader>>,
engine: Arc<EthEngine>,
genesis_block: Bytes,
chain_params: T,
state_chunks: AtomicUsize,
block_chunks: AtomicUsize,
db_restore: Arc<DatabaseRestore>,
db_restore: Arc<DatabaseRestore<T>>,
progress: super::Progress,
taking_snapshot: AtomicBool,
restoring_snapshot: AtomicBool,
}

impl Service {
impl<T: ChainRestorationParams> Service<T> {
/// Create a new snapshot service from the given parameters.
pub fn new(params: ServiceParams) -> Result<Self, Error> {
pub fn new(params: ServiceParams<T>) -> Result<Self, Error> {
let mut service = Service {
restoration: Mutex::new(None),
restoration_db_handler: params.restoration_db_handler,
snapshot_root: params.snapshot_root,
io_channel: Mutex::new(params.channel),
pruning: params.pruning,
chain_params: params.chain_params,
status: Mutex::new(RestorationStatus::Inactive),
reader: RwLock::new(None),
engine: params.engine,
genesis_block: params.genesis_block,
state_chunks: AtomicUsize::new(0),
block_chunks: AtomicUsize::new(0),
db_restore: params.db_restore,
Expand Down Expand Up @@ -340,7 +394,7 @@ impl Service {
fn replace_client_db(&self) -> Result<(), Error> {
let our_db = self.restoration_db();

self.db_restore.restore_db(&*our_db.to_string_lossy())?;
self.db_restore.restore_db(&*our_db.to_string_lossy(), &self.chain_params)?;
Ok(())
}

Expand Down Expand Up @@ -464,20 +518,21 @@ impl Service {
false => None
};

let params = RestorationParams {
manifest: manifest.clone(),
pruning: self.pruning,
db: self.restoration_db_handler.open(&rest_db)?,
writer: writer,
genesis: &self.genesis_block,
guard: Guard::new(rest_db),
engine: &*self.engine,
let state_chunks = if self.chain_params.is_light() {
0
} else {
manifest.state_hashes.len()
};

let state_chunks = manifest.state_hashes.len();
let block_chunks = manifest.block_hashes.len();

*res = Some(Restoration::new(params)?);
*res = Some(ChainRestorationParams::restoration(
&self.chain_params,
manifest.clone(),
rest_db,
&*self.restoration_db_handler,
writer,
&*self.engine,
)?);

self.restoring_snapshot.store(true, Ordering::SeqCst);

Expand Down Expand Up @@ -668,7 +723,7 @@ impl Service {
}
}

impl SnapshotService for Service {
impl<T: ChainRestorationParams> SnapshotService for Service<T> {
fn manifest(&self) -> Option<ManifestData> {
self.reader.read().as_ref().map(|r| r.manifest().clone())
}
Expand Down Expand Up @@ -752,7 +807,7 @@ impl SnapshotService for Service {
}
}

impl Drop for Service {
impl<T: ChainRestorationParams> Drop for Service<T> {
fn drop(&mut self) {
self.abort_restore();
}
Expand Down
8 changes: 6 additions & 2 deletions parity/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::Arc;

use hash::keccak;
use ethcore::account_provider::AccountProvider;
use ethcore::snapshot::{Progress, RestorationStatus, SnapshotService as SS};
use ethcore::snapshot::{Progress, RestorationStatus, SnapshotService as SS, FullNodeRestorationParams};
use ethcore::snapshot::io::{SnapshotReader, PackedReader, PackedWriter};
use ethcore::snapshot::service::Service as SnapshotService;
use ethcore::client::{Mode, DatabaseCompactionProfile, VMType};
Expand Down Expand Up @@ -67,7 +67,11 @@ pub struct SnapshotCommand {

// helper for reading chunks from arbitrary reader and feeding them into the
// service.
fn restore_using<R: SnapshotReader>(snapshot: Arc<SnapshotService>, reader: &R, recover: bool) -> Result<(), String> {
fn restore_using<R: SnapshotReader>(
snapshot: Arc<SnapshotService<FullNodeRestorationParams>>,
reader: &R,
recover: bool
) -> Result<(), String> {
let manifest = reader.manifest();

info!("Restoring to block #{} (0x{:?})", manifest.block_number, manifest.block_hash);
Expand Down

0 comments on commit 402b5ac

Please sign in to comment.