Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add threadpool to join the recovery threads #316

Merged
merged 1 commit into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 68 additions & 22 deletions src/maker/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,6 @@
//! contract broadcasts and handle idle Taker connections. Additionally, it handles recovery by broadcasting
//! contract transactions and claiming funds after an unsuccessful swap event.

use std::{
collections::HashMap,
net::IpAddr,
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering::Relaxed},
Arc, Mutex, RwLock,
},
time::{Duration, Instant},
};

use bip39::Mnemonic;
use bitcoin::{
ecdsa::Signature,
secp256k1::{self, Secp256k1},
OutPoint, PublicKey, ScriptBuf, Transaction,
};
use bitcoind::bitcoincore_rpc::RpcApi;

use crate::{
protocol::{
contract::check_hashvalues_are_equal,
Expand All @@ -36,6 +17,24 @@
},
wallet::{RPCConfig, SwapCoin, WalletSwapCoin},
};
use bip39::Mnemonic;
use bitcoin::{
ecdsa::Signature,
secp256k1::{self, Secp256k1},
OutPoint, PublicKey, ScriptBuf, Transaction,
};
use bitcoind::bitcoincore_rpc::RpcApi;
use std::{
collections::HashMap,
net::IpAddr,
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering::Relaxed},
Arc, Mutex, RwLock,
},
thread::JoinHandle,
time::{Duration, Instant},
};

use crate::{
protocol::{
Expand Down Expand Up @@ -92,6 +91,48 @@
pub pending_funding_txes: Vec<Transaction>,
}

pub struct ThreadPool {
pub threads: Mutex<Vec<JoinHandle<()>>>,
}

impl Default for ThreadPool {
fn default() -> Self {
Self::new()

Check warning on line 100 in src/maker/api.rs

View check run for this annotation

Codecov / codecov/patch

src/maker/api.rs#L99-L100

Added lines #L99 - L100 were not covered by tests
}
}

impl Drop for ThreadPool {
fn drop(&mut self) {
if let Err(e) = self.join_all_threads() {
log::error!("Error joining threads in via drop: {:?}", e);

Check warning on line 107 in src/maker/api.rs

View check run for this annotation

Codecov / codecov/patch

src/maker/api.rs#L107

Added line #L107 was not covered by tests
}
}
}
claddyy marked this conversation as resolved.
Show resolved Hide resolved

impl ThreadPool {
pub fn new() -> Self {
claddyy marked this conversation as resolved.
Show resolved Hide resolved
Self {
threads: Mutex::new(Vec::new()),
}
}

pub fn add_thread(&self, handle: JoinHandle<()>) {
let mut threads = self.threads.lock().unwrap();
threads.push(handle);
}
#[inline]
fn join_all_threads(&self) -> Result<(), MakerError> {
let mut threads = self
.threads
.lock()
.map_err(|_| MakerError::General("Failed to lock threads"))?;
while let Some(thread) = threads.pop() {
thread.join().unwrap();
claddyy marked this conversation as resolved.
Show resolved Hide resolved
claddyy marked this conversation as resolved.
Show resolved Hide resolved
}
Ok(())
}
}

/// Represents the maker in the swap protocol.
pub struct Maker {
/// Defines special maker behavior, only applicable for testing
Expand All @@ -108,6 +149,8 @@
pub highest_fidelity_proof: RwLock<Option<FidelityProof>>,
/// Is setup complete
pub is_setup_complete: AtomicBool,
/// Thread pool for managing all spawned threads
pub thread_pool: Arc<ThreadPool>,
claddyy marked this conversation as resolved.
Show resolved Hide resolved
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -222,6 +265,7 @@
connection_state: Mutex::new(HashMap::new()),
highest_fidelity_proof: RwLock::new(None),
is_setup_complete: AtomicBool::new(false),
thread_pool: Arc::new(ThreadPool::new()),
})
}

Expand Down Expand Up @@ -466,9 +510,10 @@
"[{}] Spawning recovery thread after seeing contracts in mempool",
maker.config.port
);
std::thread::spawn(move || {
let handle = std::thread::spawn(move || {
recover_from_swap(maker_clone, outgoings, incomings).unwrap();
claddyy marked this conversation as resolved.
Show resolved Hide resolved
});
maker.thread_pool.add_thread(handle);
// Clear the state value here
*connection_state = ConnectionState::default();
break;
Expand Down Expand Up @@ -547,9 +592,10 @@
"[{}] Spawning recovery thread after Taker dropped",
maker.config.port
);
std::thread::spawn(move || {
recover_from_swap(maker_clone, outgoings, incomings).unwrap();
let handle = std::thread::spawn(move || {
recover_from_swap(maker_clone, outgoings, incomings).unwrap()
});
maker.thread_pool.add_thread(handle);
// Clear the state values here
*state = ConnectionState::default();
break;
Expand Down
5 changes: 3 additions & 2 deletions src/maker/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -678,9 +678,10 @@ fn unexpected_recovery(maker: Arc<Maker>) -> Result<(), MakerError> {
}
// Spawn a separate thread to wait for contract maturity and broadcasting timelocked.
let maker_clone = maker.clone();
std::thread::spawn(move || {
recover_from_swap(maker_clone, outgoings, incomings).unwrap();
let handle = std::thread::spawn(move || {
recover_from_swap(maker_clone, outgoings, incomings).unwrap()
});
maker.thread_pool.add_thread(handle);
}
Ok(())
}
61 changes: 27 additions & 34 deletions src/maker/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,23 +433,22 @@
// Global server Mutex, to switch on/off p2p network.
let accepting_clients = Arc::new(AtomicBool::new(false));

claddyy marked this conversation as resolved.
Show resolved Hide resolved
// Spawn Server threads.
// All thread handles are stored in the thread_pool, which are all joined at server shutdown.
let mut thread_pool = Vec::new();

if !maker.shutdown.load(Relaxed) {
// 1. Bitcoin Core Connection checker thread.
// Ensures that Bitcoin Core connection is live.
// If not, it will block p2p connections until Core works again.
let maker_clone = maker.clone();
let acc_client_clone = accepting_clients.clone();
let conn_check_thread: thread::JoinHandle<Result<(), MakerError>> = thread::Builder::new()
let conn_check_thread = thread::Builder::new()
.name("Bitcoin Core Connection Checker Thread".to_string())
.spawn(move || {
log::info!("[{}] Spawning Bitcoin Core connection checker thread", port);
check_connection_with_core(maker_clone, acc_client_clone)
if let Err(e) = check_connection_with_core(maker_clone.clone(), acc_client_clone) {
log::error!("[{}] Bitcoin Core connection check failed: {:?}", port, e);
maker_clone.shutdown.store(true, Relaxed);

Check warning on line 448 in src/maker/server.rs

View check run for this annotation

Codecov / codecov/patch

src/maker/server.rs#L447-L448

Added lines #L447 - L448 were not covered by tests
}
})?;
thread_pool.push(conn_check_thread);
maker.thread_pool.add_thread(conn_check_thread);

// 2. Idle Client connection checker thread.
// This threads check idelness of peer in live swaps.
Expand All @@ -462,9 +461,12 @@
"[{}] Spawning Client connection status checker thread",
port
);
check_for_idle_states(maker_clone.clone())
if let Err(e) = check_for_idle_states(maker_clone.clone()) {
log::error!("Failed checking client's idle state {:?}", e);
maker_clone.shutdown.store(true, Relaxed);
}
})?;
thread_pool.push(idle_conn_check_thread);
maker.thread_pool.add_thread(idle_conn_check_thread);

// 3. Watchtower thread.
// This thread checks for broadcasted contract transactions, which usually means violation of the protocol.
Expand All @@ -475,9 +477,12 @@
.name("Contract Watcher Thread".to_string())
.spawn(move || {
log::info!("[{}] Spawning contract-watcher thread", port);
check_for_broadcasted_contracts(maker_clone.clone())
if let Err(e) = check_for_broadcasted_contracts(maker_clone.clone()) {
maker_clone.shutdown.store(true, Relaxed);
log::error!("Failed checking broadcasted contracts {:?}", e);

Check warning on line 482 in src/maker/server.rs

View check run for this annotation

Codecov / codecov/patch

src/maker/server.rs#L481-L482

Added lines #L481 - L482 were not covered by tests
}
})?;
thread_pool.push(contract_watcher_thread);
maker.thread_pool.add_thread(contract_watcher_thread);

// 4: The RPC server thread.
// User for responding back to `maker-cli` apps.
Expand All @@ -486,10 +491,16 @@
.name("RPC Thread".to_string())
.spawn(move || {
log::info!("[{}] Spawning RPC server thread", port);
start_rpc_server(maker_clone)
match start_rpc_server(maker_clone.clone()) {
Ok(_) => (),
Err(e) => {
log::error!("Failed starting rpc server {:?}", e);
maker_clone.shutdown.store(true, Relaxed);

Check warning on line 498 in src/maker/server.rs

View check run for this annotation

Codecov / codecov/patch

src/maker/server.rs#L496-L498

Added lines #L496 - L498 were not covered by tests
}
}
})?;

thread_pool.push(rpc_thread);
maker.thread_pool.add_thread(rpc_thread);

sleep(Duration::from_secs(heart_beat_interval)); // wait for 1 beat, to complete spawns of all the threads.
maker.is_setup_complete.store(true, Relaxed);
Expand All @@ -516,18 +527,15 @@
match listener.accept() {
Ok((mut stream, client_addr)) => {
log::info!("[{}] Spawning Client Handler thread", maker.config.port);

let maker_for_handler = maker.clone();
let client_handler_thread = thread::Builder::new()
.name("Client Handler Thread".to_string())
.spawn(move || {
if let Err(e) = handle_client(maker, &mut stream, client_addr) {
if let Err(e) = handle_client(maker_for_handler, &mut stream, client_addr) {
log::error!("[{}] Error Handling client request {:?}", port, e);
Err(e)
} else {
Ok(())
}
})?;
thread_pool.push(client_handler_thread);
maker.thread_pool.add_thread(client_handler_thread);
}

Err(e) => {
Expand All @@ -549,21 +557,6 @@

log::info!("[{}] Maker is shutting down.", port);

// Shuting down. Join all the threads.
for thread in thread_pool {
log::info!(
"[{}] Closing Thread: {}",
port,
thread.thread().name().expect("Thread name expected")
);
let join_result = thread.join();
if let Ok(r) = join_result {
log::info!("[{}] Thread closing result: {:?}", port, r)
} else if let Err(e) = join_result {
log::info!("[{}] error in internal thread: {:?}", port, e);
}
}

claddyy marked this conversation as resolved.
Show resolved Hide resolved
if maker.config.connection_type == ConnectionType::TOR && cfg!(feature = "tor") {
crate::tor::kill_tor_handles(tor_thread.expect("Tor thread expected"));
}
Expand Down
Loading