Skip to content

Commit

Permalink
Remove atomic-option lib
Browse files Browse the repository at this point in the history
  • Loading branch information
doyoubi committed Aug 6, 2022
1 parent 50cdb6c commit 057414c
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 50 deletions.
7 changes: 0 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ tokio-util = { version = "0.7", features = ["codec"] }
tokio-stream = { version = "0.1", features = ["net"] }
warp = { version = "0.3", features = ["compression"] }
futures = "0.3"
atomic-option = "0.1"
crc16 = "0.4"
crc64 = "2"
caseless = "0.2"
Expand Down
15 changes: 7 additions & 8 deletions src/common/resp_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use crate::common::utils::pretty_print_bytes;
use crate::protocol::{
BinSafeStr, OptionalMulti, RedisClient, RedisClientError, RedisClientFactory, Resp, RespVec,
};
use atomic_option::AtomicOption;
use futures::channel::oneshot;
use futures::{select, Future, FutureExt};
use parking_lot::Mutex;
use std::pin::Pin;
use std::str;
use std::sync::atomic;
Expand Down Expand Up @@ -169,8 +169,8 @@ type RetrieverFut = Pin<Box<dyn Future<Output = Result<(), RedisClientError>> +

pub struct I64Retriever<F: RedisClientFactory> {
data: Arc<atomic::AtomicI64>,
stop_signal_sender: AtomicOption<oneshot::Sender<()>>,
stop_signal_receiver: AtomicOption<oneshot::Receiver<()>>,
stop_signal_sender: Arc<Mutex<Option<oneshot::Sender<()>>>>,
stop_signal_receiver: Arc<Mutex<Option<oneshot::Receiver<()>>>>,
client_factory: Arc<F>,
}

Expand All @@ -179,8 +179,8 @@ impl<F: RedisClientFactory> I64Retriever<F> {
let (sender, receiver) = oneshot::channel();
let data = Arc::new(atomic::AtomicI64::new(init_data));

let stop_signal_sender = AtomicOption::new(Box::new(sender));
let stop_signal_receiver = AtomicOption::new(Box::new(receiver));
let stop_signal_sender = Arc::new(Mutex::new(Some(sender)));
let stop_signal_receiver = Arc::new(Mutex::new(Some(receiver)));
Self {
data,
stop_signal_sender,
Expand All @@ -207,8 +207,7 @@ impl<F: RedisClientFactory> I64Retriever<F> {
+ Sync
+ 'static,
{
if let Some(stop_signal_receiver) = self.stop_signal_receiver.take(atomic::Ordering::SeqCst)
{
if let Some(stop_signal_receiver) = self.stop_signal_receiver.lock().take() {
let data_clone = self.data.clone();
let handle_result = move |resp: RespVec| -> Result<(), RedisClientError> {
handle_func(resp, &data_clone)
Expand Down Expand Up @@ -245,7 +244,7 @@ impl<F: RedisClientFactory> I64Retriever<F> {
}

pub fn try_stop(&self) -> bool {
match self.stop_signal_sender.take(atomic::Ordering::SeqCst) {
match self.stop_signal_sender.lock().take() {
Some(sender) => sender.send(()).is_ok(),
None => false,
}
Expand Down
14 changes: 7 additions & 7 deletions src/migration/scan_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ use crate::protocol::{
Resp, RespVec,
};
use crate::proxy::backend::CmdTask;
use atomic_option::AtomicOption;
use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use futures::{future, Future, FutureExt, StreamExt};
use parking_lot::Mutex;
use std::cmp::min;
use std::collections::HashSet;
use std::num::NonZeroUsize;
Expand Down Expand Up @@ -65,8 +65,8 @@ struct DataEntry {
type MgrFut = Pin<Box<dyn Future<Output = Result<(), MigrationError>> + Send>>;

pub struct ScanMigrationTask<T: CmdTask, F: RedisClientFactory> {
handle: AtomicOption<FutureAutoStopHandle>, // once this task get dropped, the future will stop.
fut: AtomicOption<MgrFut>,
handle: Arc<Mutex<Option<FutureAutoStopHandle>>>, // once this task get dropped, the future will stop.
fut: Arc<Mutex<Option<MgrFut>>>,
sync_tasks_sender: UnboundedSender<T>,
src_address: String,
dst_address: String,
Expand Down Expand Up @@ -106,8 +106,8 @@ impl<T: CmdTask, F: RedisClientFactory> ScanMigrationTask<T, F> {
const POOL_SIZE: usize = 1024;

Self {
handle: AtomicOption::new(Box::new(fut_handle)),
fut: AtomicOption::new(Box::new(fut)),
handle: Arc::new(Mutex::new(Some(fut_handle))),
fut: Arc::new(Mutex::new(Some(fut))),
sync_tasks_sender: sender,
src_address,
dst_address,
Expand Down Expand Up @@ -264,11 +264,11 @@ impl<T: CmdTask, F: RedisClientFactory> ScanMigrationTask<T, F> {
}

pub fn start(&self) -> Option<MgrFut> {
self.fut.take(Ordering::SeqCst).map(|t| *t)
self.fut.lock().take()
}

pub fn stop(&self) -> bool {
self.handle.take(Ordering::SeqCst).is_some()
self.handle.lock().take().is_some()
}

fn handle_forward(opt_multi_resp: OptionalMulti<RespVec>) -> Result<(), RedisClientError> {
Expand Down
27 changes: 13 additions & 14 deletions src/migration/scan_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@ use crate::proxy::command::CmdTypeTuple;
use crate::proxy::migration_backend::{RestoreDataCmdTaskHandler, WaitableTask};
use crate::proxy::sender::{CmdTaskSender, CmdTaskSenderFactory};
use crate::proxy::service::ServerProxyConfig;
use atomic_option::AtomicOption;
use futures::channel::oneshot;
use futures::future::BoxFuture;
use futures::{future, select, Future, FutureExt, TryFutureExt};
use parking_lot::Mutex;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -54,8 +53,8 @@ where
meta: MigrationMeta,
state: Arc<AtomicMigrationState>,
client_factory: Arc<RCF>,
stop_signal_sender: AtomicOption<oneshot::Sender<()>>,
stop_signal_receiver: AtomicOption<oneshot::Receiver<()>>,
stop_signal_sender: Arc<Mutex<Option<oneshot::Sender<()>>>>,
stop_signal_receiver: Arc<Mutex<Option<oneshot::Receiver<()>>>>,
task: Arc<ScanMigrationTask<T, RCF>>,
blocking_ctrl: Arc<BC>,
phantom: PhantomData<T>,
Expand Down Expand Up @@ -98,8 +97,8 @@ where
meta,
state: Arc::new(AtomicMigrationState::initial_state()),
client_factory,
stop_signal_sender: AtomicOption::new(Box::new(stop_signal_sender)),
stop_signal_receiver: AtomicOption::new(Box::new(stop_signal_receiver)),
stop_signal_sender: Arc::new(Mutex::new(Some(stop_signal_sender))),
stop_signal_receiver: Arc::new(Mutex::new(Some(stop_signal_receiver))),
task: Arc::new(task),
blocking_ctrl,
phantom: PhantomData,
Expand Down Expand Up @@ -367,7 +366,7 @@ where
fn start<'s>(
&'s self,
) -> Pin<Box<dyn Future<Output = Result<(), MigrationError>> + Send + 's>> {
let receiver = match self.stop_signal_receiver.take(Ordering::SeqCst) {
let receiver = match self.stop_signal_receiver.lock().take() {
Some(r) => r,
None => return Box::pin(future::err(MigrationError::AlreadyStarted)),
};
Expand Down Expand Up @@ -454,7 +453,7 @@ where
let handle = MigratingTaskHandle {
task: self.task.clone(),
meta: self.meta.clone(),
stop_signal_sender: Some(*self.stop_signal_sender.take(Ordering::SeqCst)?),
stop_signal_sender: Some(self.stop_signal_sender.lock().take()?),
};
Some(Box::new(handle))
}
Expand Down Expand Up @@ -503,8 +502,8 @@ where
state: Arc<AtomicMigrationState>,
_client_factory: Arc<RCF>,
_sender_factory: Arc<TSF>,
stop_signal_sender: AtomicOption<oneshot::Sender<()>>,
stop_signal_receiver: AtomicOption<oneshot::Receiver<()>>,
stop_signal_sender: Arc<Mutex<Option<oneshot::Sender<()>>>>,
stop_signal_receiver: Arc<Mutex<Option<oneshot::Receiver<()>>>>,
cmd_handler: RestoreDataCmdTaskHandler<
CTF,
<TSF as CmdTaskSenderFactory>::Sender,
Expand Down Expand Up @@ -561,8 +560,8 @@ where
state: Arc::new(AtomicMigrationState::initial_state()),
_client_factory: client_factory,
_sender_factory: sender_factory,
stop_signal_sender: AtomicOption::new(Box::new(stop_signal_sender)),
stop_signal_receiver: AtomicOption::new(Box::new(stop_signal_receiver)),
stop_signal_sender: Arc::new(Mutex::new(Some(stop_signal_sender))),
stop_signal_receiver: Arc::new(Mutex::new(Some(stop_signal_receiver))),
cmd_handler,
_cmd_task_factory: cmd_task_factory,
active_redirection,
Expand All @@ -588,7 +587,7 @@ where
fn start<'s>(
&'s self,
) -> Pin<Box<dyn Future<Output = Result<(), MigrationError>> + Send + 's>> {
let receiver = match self.stop_signal_receiver.take(Ordering::SeqCst) {
let receiver = match self.stop_signal_receiver.lock().take() {
Some(r) => r,
None => return Box::pin(future::err(MigrationError::AlreadyStarted)),
};
Expand Down Expand Up @@ -649,7 +648,7 @@ where
fn get_stop_handle(&self) -> Option<Box<dyn Drop + Send + Sync + 'static>> {
let handle = ImportingTaskHandle {
meta: self.meta.clone(),
stop_signal_sender: Some(*self.stop_signal_sender.take(Ordering::SeqCst)?),
stop_signal_sender: Some(self.stop_signal_sender.lock().take()?),
};
Some(Box::new(handle))
}
Expand Down
29 changes: 16 additions & 13 deletions src/proxy/migration_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::migration::scan_migration::{pttl_to_restore_expire_time, PTTL_KEY_NOT
use crate::migration::stats::MigrationStats;
use crate::protocol::{Array, BinSafeStr, BulkStr, RFunctor, Resp, RespVec, VFunctor};
use arc_swap::ArcSwapOption;
use atomic_option::AtomicOption;
use dashmap::DashSet;
use either::Either;
use futures::channel::{
Expand Down Expand Up @@ -490,15 +489,19 @@ where
umsync_task_sender: UmSyncTaskSender<F>,
del_task_sender: DeleteKeyTaskSender,
#[allow(clippy::type_complexity)]
task_receivers: AtomicOption<(
ExistsTaskReceiver<F>,
DumpPttlTaskReceiver<F>,
RestoreTaskReceiver<F>,
PendingUmSyncTaskReceiver<F::Task>,
UmSyncTaskReceiver<F>,
DeleteKeyTaskReceiver,
WaitHandle,
)>,
task_receivers: Arc<
parking_lot::Mutex<
Option<(
ExistsTaskReceiver<F>,
DumpPttlTaskReceiver<F>,
RestoreTaskReceiver<F>,
PendingUmSyncTaskReceiver<F::Task>,
UmSyncTaskReceiver<F>,
DeleteKeyTaskReceiver,
WaitHandle,
)>,
>,
>,
cmd_task_factory: Arc<F>,
key_lock: Arc<KeyLock>,
stats: Arc<MigrationStats>,
Expand Down Expand Up @@ -530,15 +533,15 @@ where
let (umsync_task_sender, umsync_task_receiver) = unbounded();
let (del_task_sender, del_task_receiver) = unbounded();
let (registry, wait_handle) = WaitRegistry::new();
let task_receivers = AtomicOption::new(Box::new((
let task_receivers = Arc::new(parking_lot::Mutex::new(Some((
exists_task_receiver,
dump_pttl_task_receiver,
restore_task_receiver,
pending_umsync_task_receiver,
umsync_task_receiver,
del_task_receiver,
wait_handle,
)));
))));
let key_lock = Arc::new(KeyLock::new(LOCK_SHARD_SIZE));
Self {
src_sender,
Expand Down Expand Up @@ -578,7 +581,7 @@ where
let cmd_task_factory = self.cmd_task_factory.clone();
let key_lock = self.key_lock.clone();

let receiver_opt = self.task_receivers.take(Ordering::SeqCst).map(|p| *p);
let receiver_opt = self.task_receivers.lock().take();
let (
exists_task_receiver,
dump_pttl_task_receiver,
Expand Down

0 comments on commit 057414c

Please sign in to comment.