From 669d611e7b018e6f2e2d66dc931a52f39d9b7fcf Mon Sep 17 00:00:00 2001 From: yukang Date: Tue, 21 Nov 2023 23:51:38 +0800 Subject: [PATCH] use OnceCell for flag --- db-migration/src/lib.rs | 71 ++++++++++++++++++++++----------------- db-migration/src/tests.rs | 65 +++++++++++++++++++++++++++++++++-- 2 files changed, 103 insertions(+), 33 deletions(-) diff --git a/db-migration/src/lib.rs b/db-migration/src/lib.rs index 29987813a27..28f00cba23e 100644 --- a/db-migration/src/lib.rs +++ b/db-migration/src/lib.rs @@ -1,5 +1,7 @@ //! TODO(doc): @quake -use ckb_channel::{unbounded, Receiver}; +use ckb_channel::select; +use ckb_channel::unbounded; +use ckb_channel::Receiver; use ckb_db::{ReadOnlyDB, RocksDB}; use ckb_db_schema::{COLUMN_META, META_TIP_HEADER_KEY, MIGRATION_VERSION_KEY}; use ckb_error::{Error, InternalErrorKind}; @@ -7,18 +9,17 @@ use ckb_logger::{debug, error, info}; use ckb_stop_handler::register_thread; use console::Term; pub use indicatif::{HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle}; +use once_cell::sync::OnceCell; use std::cmp::Ordering; use std::collections::BTreeMap; use std::collections::VecDeque; -use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::sync::Mutex; use std::thread; use std::thread::JoinHandle; /// Shutdown flag for background migration. -pub static SHUTDOWN_BACKGROUND_MIGRATION: once_cell::sync::Lazy = - once_cell::sync::Lazy::new(|| AtomicBool::new(false)); +pub static SHUTDOWN_BACKGROUND_MIGRATION: OnceCell = OnceCell::new(); #[cfg(test)] mod tests; @@ -34,10 +35,10 @@ pub struct Migrations { } /// Commands -#[derive(PartialEq, Eq)] +#[derive(PartialEq, Eq, Debug)] enum Command { Start, - //Stop, + Stop, } type MigrationTasks = VecDeque<(String, Arc)>; @@ -54,32 +55,37 @@ impl MigrationWorker { pub fn start(self) -> JoinHandle<()> { thread::spawn(move || { - let msg = match self.inbox.recv() { - Ok(msg) => Some(msg), - Err(_err) => return, - }; - - if let Some(Command::Start) = msg { + if let Ok(Command::Start) = self.inbox.recv() { let mut idx = 0; let migrations_count = self.tasks.lock().unwrap().len() as u64; let mpb = Arc::new(MultiProgress::new()); while let Some((name, task)) = self.tasks.lock().unwrap().pop_front() { - eprintln!("start to run migrate in background: {}", name); - let mpbc = Arc::clone(&mpb); - idx += 1; - let pb = move |count: u64| -> ProgressBar { - let pb = mpbc.add(ProgressBar::new(count)); - pb.set_draw_target(ProgressDrawTarget::term(Term::stdout(), None)); - pb.set_prefix(format!("[{}/{}]", idx, migrations_count)); - pb - }; - let db = task.migrate(self.db.clone(), Arc::new(pb)).unwrap(); - db.put_default(MIGRATION_VERSION_KEY, task.version()) - .map_err(|err| { - internal_error(format!("failed to migrate the database: {err}")) - }) - .unwrap(); + select! { + recv(self.inbox) -> msg => { + if let Ok(Command::Stop) = msg { + eprintln!("stop to run migrate in background: {}", name); + break; + } + } + default => { + eprintln!("start to run migrate in background: {}", name); + let mpbc = Arc::clone(&mpb); + idx += 1; + let pb = move |count: u64| -> ProgressBar { + let pb = mpbc.add(ProgressBar::new(count)); + pb.set_draw_target(ProgressDrawTarget::term(Term::stdout(), None)); + pb.set_prefix(format!("[{}/{}]", idx, migrations_count)); + pb + }; + let db = task.migrate(self.db.clone(), Arc::new(pb)).unwrap(); + db.put_default(MIGRATION_VERSION_KEY, task.version()) + .map_err(|err| { + internal_error(format!("failed to migrate the database: {err}")) + }) + .unwrap(); + } + } } } }) @@ -134,7 +140,7 @@ impl Migrations { .last() .unwrap_or_else(|| panic!("should have at least one version")) .version(); - debug!("latest database version [{}]", latest_version); + debug!("latest database version [{}]", latest_version); db_version.as_str().cmp(latest_version) } @@ -239,10 +245,13 @@ impl Migrations { let worker = MigrationWorker::new(tasks, db.clone(), rx); let exit_signal = ckb_stop_handler::new_crossbeam_exit_rx(); + let clone = v.to_string(); + let tx_clone = tx.clone(); thread::spawn(move || { let _ = exit_signal.recv(); - SHUTDOWN_BACKGROUND_MIGRATION.store(true, std::sync::atomic::Ordering::SeqCst); - eprintln!("set shutdown flag to true"); + let res = SHUTDOWN_BACKGROUND_MIGRATION.set(true); + let _ = tx_clone.send(Command::Stop); + eprintln!("set shutdown flag to true: {:?} version: {}", res, clone); }); let handler = worker.start(); @@ -356,7 +365,7 @@ pub trait Migration: Send + Sync { /// If a migration need to implement the recovery logic, it should check this flag periodically, /// store the migration progress when exiting and recover from the current progress when restarting. fn stop_background(&self) -> bool { - SHUTDOWN_BACKGROUND_MIGRATION.load(std::sync::atomic::Ordering::SeqCst) + *SHUTDOWN_BACKGROUND_MIGRATION.get().unwrap_or(&false) } /// Check if the background migration can be resumed. diff --git a/db-migration/src/tests.rs b/db-migration/src/tests.rs index d81d79a0eab..7c784e97083 100644 --- a/db-migration/src/tests.rs +++ b/db-migration/src/tests.rs @@ -128,6 +128,8 @@ fn test_customized_migration() { #[test] fn test_background_migration() { + use ckb_stop_handler::broadcast_exit_signals; + pub struct BackgroundMigration { version: String, } @@ -162,6 +164,35 @@ fn test_background_migration() { } } + pub struct RunStopMigration { + version: String, + } + impl Migration for RunStopMigration { + fn run_in_background(&self) -> bool { + true + } + + fn migrate( + &self, + db: RocksDB, + _pb: Arc ProgressBar + Send + Sync>, + ) -> Result { + let db_tx = db.transaction(); + loop { + if self.stop_background() { + let v = self.version.as_bytes(); + db_tx.put("1", v, &[2])?; + db_tx.commit()?; + return Ok(db); + } + } + } + + fn version(&self) -> &str { + &self.version.as_str() + } + } + let tmp_dir = tempfile::Builder::new() .prefix("test_default_migration") .tempdir() @@ -212,8 +243,9 @@ fn test_background_migration() { let db = migrations .migrate(RocksDB::open(&config, 12), true) .unwrap(); - // sleep 1 seconds - std::thread::sleep(std::time::Duration::from_secs(1)); + + // wait for background migration to finish + std::thread::sleep(std::time::Duration::from_millis(1000)); assert_eq!( b"20241127101122".to_vec(), db.get_pinned_default(MIGRATION_VERSION_KEY) @@ -238,4 +270,33 @@ fn test_background_migration() { .to_vec(); assert_eq!(v, vec![1]); } + + { + let mut migrations = Migrations::default(); + migrations.add_migration(Arc::new(RunStopMigration { + version: "20251116225943".to_string(), + })); + + let db = ReadOnlyDB::open_cf(&config.path, vec!["4"]) + .unwrap() + .unwrap(); + + assert!(migrations.can_run_in_background(&db)); + let db = migrations + .migrate(RocksDB::open(&config, 12), true) + .unwrap(); + + std::thread::sleep(std::time::Duration::from_millis(100)); + //send stop signal + broadcast_exit_signals(); + std::thread::sleep(std::time::Duration::from_millis(200)); + + let db_tx = db.transaction(); + let v = db_tx + .get_pinned("1", "20251116225943".as_bytes()) + .unwrap() + .unwrap() + .to_vec(); + assert_eq!(v, vec![2]); + } }