Skip to content

Commit

Permalink
use OnceCell for flag
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Nov 21, 2023
1 parent 5d86765 commit 669d611
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 33 deletions.
71 changes: 40 additions & 31 deletions db-migration/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
//! TODO(doc): @quake
use ckb_channel::{unbounded, Receiver};
use ckb_channel::select;
use ckb_channel::unbounded;
use ckb_channel::Receiver;
use ckb_db::{ReadOnlyDB, RocksDB};
use ckb_db_schema::{COLUMN_META, META_TIP_HEADER_KEY, MIGRATION_VERSION_KEY};
use ckb_error::{Error, InternalErrorKind};
use ckb_logger::{debug, error, info};
use ckb_stop_handler::register_thread;
use console::Term;
pub use indicatif::{HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle};
use once_cell::sync::OnceCell;
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::thread::JoinHandle;

/// Shutdown flag for background migration.
pub static SHUTDOWN_BACKGROUND_MIGRATION: once_cell::sync::Lazy<AtomicBool> =
once_cell::sync::Lazy::new(|| AtomicBool::new(false));
pub static SHUTDOWN_BACKGROUND_MIGRATION: OnceCell<bool> = OnceCell::new();

#[cfg(test)]
mod tests;
Expand All @@ -34,10 +35,10 @@ pub struct Migrations {
}

/// Commands
#[derive(PartialEq, Eq)]
#[derive(PartialEq, Eq, Debug)]
enum Command {
Start,
//Stop,
Stop,
}

type MigrationTasks = VecDeque<(String, Arc<dyn Migration>)>;
Expand All @@ -54,32 +55,37 @@ impl MigrationWorker {

pub fn start(self) -> JoinHandle<()> {
thread::spawn(move || {
let msg = match self.inbox.recv() {
Ok(msg) => Some(msg),
Err(_err) => return,
};

if let Some(Command::Start) = msg {
if let Ok(Command::Start) = self.inbox.recv() {
let mut idx = 0;
let migrations_count = self.tasks.lock().unwrap().len() as u64;
let mpb = Arc::new(MultiProgress::new());

while let Some((name, task)) = self.tasks.lock().unwrap().pop_front() {
eprintln!("start to run migrate in background: {}", name);
let mpbc = Arc::clone(&mpb);
idx += 1;
let pb = move |count: u64| -> ProgressBar {
let pb = mpbc.add(ProgressBar::new(count));
pb.set_draw_target(ProgressDrawTarget::term(Term::stdout(), None));
pb.set_prefix(format!("[{}/{}]", idx, migrations_count));
pb
};
let db = task.migrate(self.db.clone(), Arc::new(pb)).unwrap();
db.put_default(MIGRATION_VERSION_KEY, task.version())
.map_err(|err| {
internal_error(format!("failed to migrate the database: {err}"))
})
.unwrap();
select! {
recv(self.inbox) -> msg => {
if let Ok(Command::Stop) = msg {
eprintln!("stop to run migrate in background: {}", name);
break;
}
}
default => {
eprintln!("start to run migrate in background: {}", name);
let mpbc = Arc::clone(&mpb);
idx += 1;
let pb = move |count: u64| -> ProgressBar {
let pb = mpbc.add(ProgressBar::new(count));
pb.set_draw_target(ProgressDrawTarget::term(Term::stdout(), None));
pb.set_prefix(format!("[{}/{}]", idx, migrations_count));
pb
};
let db = task.migrate(self.db.clone(), Arc::new(pb)).unwrap();
db.put_default(MIGRATION_VERSION_KEY, task.version())
.map_err(|err| {
internal_error(format!("failed to migrate the database: {err}"))
})
.unwrap();
}
}
}
}
})
Expand Down Expand Up @@ -134,7 +140,7 @@ impl Migrations {
.last()
.unwrap_or_else(|| panic!("should have at least one version"))
.version();
debug!("latest database version [{}]", latest_version);
debug!("latest database version [{}]", latest_version);

db_version.as_str().cmp(latest_version)
}
Expand Down Expand Up @@ -239,10 +245,13 @@ impl Migrations {
let worker = MigrationWorker::new(tasks, db.clone(), rx);

let exit_signal = ckb_stop_handler::new_crossbeam_exit_rx();
let clone = v.to_string();
let tx_clone = tx.clone();
thread::spawn(move || {
let _ = exit_signal.recv();
SHUTDOWN_BACKGROUND_MIGRATION.store(true, std::sync::atomic::Ordering::SeqCst);
eprintln!("set shutdown flag to true");
let res = SHUTDOWN_BACKGROUND_MIGRATION.set(true);
let _ = tx_clone.send(Command::Stop);
eprintln!("set shutdown flag to true: {:?} version: {}", res, clone);
});

let handler = worker.start();
Expand Down Expand Up @@ -356,7 +365,7 @@ pub trait Migration: Send + Sync {
/// If a migration need to implement the recovery logic, it should check this flag periodically,
/// store the migration progress when exiting and recover from the current progress when restarting.
fn stop_background(&self) -> bool {
SHUTDOWN_BACKGROUND_MIGRATION.load(std::sync::atomic::Ordering::SeqCst)
*SHUTDOWN_BACKGROUND_MIGRATION.get().unwrap_or(&false)
}

/// Check if the background migration can be resumed.
Expand Down
65 changes: 63 additions & 2 deletions db-migration/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ fn test_customized_migration() {

#[test]
fn test_background_migration() {
use ckb_stop_handler::broadcast_exit_signals;

pub struct BackgroundMigration {
version: String,
}
Expand Down Expand Up @@ -162,6 +164,35 @@ fn test_background_migration() {
}
}

pub struct RunStopMigration {
version: String,
}
impl Migration for RunStopMigration {
fn run_in_background(&self) -> bool {
true
}

fn migrate(
&self,
db: RocksDB,
_pb: Arc<dyn Fn(u64) -> ProgressBar + Send + Sync>,
) -> Result<RocksDB, Error> {
let db_tx = db.transaction();
loop {
if self.stop_background() {
let v = self.version.as_bytes();
db_tx.put("1", v, &[2])?;
db_tx.commit()?;
return Ok(db);
}
}
}

fn version(&self) -> &str {
&self.version.as_str()
}
}

let tmp_dir = tempfile::Builder::new()
.prefix("test_default_migration")
.tempdir()
Expand Down Expand Up @@ -212,8 +243,9 @@ fn test_background_migration() {
let db = migrations
.migrate(RocksDB::open(&config, 12), true)
.unwrap();
// sleep 1 seconds
std::thread::sleep(std::time::Duration::from_secs(1));

// wait for background migration to finish
std::thread::sleep(std::time::Duration::from_millis(1000));
assert_eq!(
b"20241127101122".to_vec(),
db.get_pinned_default(MIGRATION_VERSION_KEY)
Expand All @@ -238,4 +270,33 @@ fn test_background_migration() {
.to_vec();
assert_eq!(v, vec![1]);
}

{
let mut migrations = Migrations::default();
migrations.add_migration(Arc::new(RunStopMigration {
version: "20251116225943".to_string(),
}));

let db = ReadOnlyDB::open_cf(&config.path, vec!["4"])
.unwrap()
.unwrap();

assert!(migrations.can_run_in_background(&db));
let db = migrations
.migrate(RocksDB::open(&config, 12), true)
.unwrap();

std::thread::sleep(std::time::Duration::from_millis(100));
//send stop signal
broadcast_exit_signals();
std::thread::sleep(std::time::Duration::from_millis(200));

let db_tx = db.transaction();
let v = db_tx
.get_pinned("1", "20251116225943".as_bytes())
.unwrap()
.unwrap()
.to_vec();
assert_eq!(v, vec![2]);
}
}

0 comments on commit 669d611

Please sign in to comment.