From 7e60e2c00d6837a6fd5a2afcafa761aa7e102bcd Mon Sep 17 00:00:00 2001 From: jbesraa Date: Wed, 12 Feb 2025 11:37:51 +0200 Subject: [PATCH] Add option to stop miner after submitting a single share --- .../mining-device-sv1/src/client.rs | 124 +++++++++++------- .../test-utils/mining-device-sv1/src/main.rs | 1 + roles/tests-integration/lib/mod.rs | 4 +- 3 files changed, 83 insertions(+), 46 deletions(-) diff --git a/roles/test-utils/mining-device-sv1/src/client.rs b/roles/test-utils/mining-device-sv1/src/client.rs index 9db42adbea..2eba670c7c 100644 --- a/roles/test-utils/mining-device-sv1/src/client.rs +++ b/roles/test-utils/mining-device-sv1/src/client.rs @@ -71,7 +71,7 @@ impl Client { /// the information from `sender_share`, it is formatted as a `v1::client_to_server::Submit` /// and then serialized into a json message that is sent to the Upstream via /// `sender_outgoing`. - pub async fn connect(client_id: u32, upstream_addr: SocketAddr) { + pub async fn connect(client_id: u32, upstream_addr: SocketAddr, single_submit: bool) { let stream = TcpStream::connect(upstream_addr).await.unwrap(); let (reader, mut writer) = stream.into_split(); @@ -86,6 +86,7 @@ impl Client { // Upstream via `sender_outgoing` let (sender_share, receiver_share) = unbounded(); + let (send_stop_miner, mut recv_stop_miner) = tokio::sync::watch::channel(false); // Instantiates a new `Miner` (a mock of an actual Mining Device) with a job id of 0. let miner = Arc::new(Mutex::new(Miner::new(0))); @@ -103,32 +104,53 @@ impl Client { // Reads messages sent by the Upstream from the socket to be passed to the // `receiver_incoming` + + let mut recv_stop_miner_1 = recv_stop_miner.clone(); task::spawn(async move { - let mut messages = BufReader::new(reader).lines(); - while let Ok(message) = messages.next_line().await { - match message { + tokio::select!( + _ = recv_stop_miner_1.changed() => { + warn!("Stopping miner") + }, + _ = async { + let mut messages = BufReader::new(reader).lines(); + while let Ok(message) = messages.next_line().await { + match message { Some(msg) => { - if let Err(e) = sender_incoming.send(msg).await { - error!("Failed to send message to receiver_incoming: {:?}", e); - break; // Exit the loop if sending fails - } + if let Err(e) = sender_incoming.send(msg).await { + error!("Failed to send message to receiver_incoming: {:?}", e); + break; // Exit the loop if sending fails + } } None => { - error!("Error reading from socket"); - break; // Exit the loop on read failure + error!("Error reading from socket"); + break; // Exit the loop on read failure } + } } - } - warn!("Reader task terminated."); + warn!("Reader task terminated."); + } => {} + ); }); // Waits to receive a message from `sender_outgoing` and writes it to the socket for the // Upstream to receive + let mut recv_stop_miner_2 = recv_stop_miner.clone(); task::spawn(async move { - loop { - let message: String = receiver_outgoing.recv().await.unwrap(); - (writer).write_all(message.as_bytes()).await.unwrap(); - } + tokio::select!( + _ = recv_stop_miner_2.changed() => { + warn!("Stopping miner") + }, + _ = async { + loop { + let message: String = receiver_outgoing.recv().await.unwrap(); + (writer).write_all(message.as_bytes()).await.unwrap(); + if message.contains("mining.submit") && single_submit { + dbg!("Stopping miner here"); + send_stop_miner.send(true).unwrap(); + } + } + } => {} + ) }); // Clone the sender to the Upstream node to use it in another task below as @@ -170,43 +192,54 @@ impl Client { // Sends relevant candidate block header values needed to construct a // `mining.submit` message to the `receiver_share` in the task that is responsible // for sending messages to the Upstream node. - sender_share + if sender_share .try_send((nonce, job_id.unwrap(), version.unwrap(), time)) - .unwrap(); + .is_err() + { + warn!("Share channel is not available"); + break; + } } miner_cloned .safe_lock(|m| m.header.as_mut().map(|h| h.nonce += 1)) .unwrap(); }); - // Task to receive relevant candidate block header values needed to construct a // `mining.submit` message. This message is contructed as a `client_to_server::Submit` and // then serialized into json to be sent to the Upstream via the `sender_outgoing` sender. let cloned = client.clone(); task::spawn(async move { - let recv = receiver_share.clone(); - loop { - let (nonce, job_id, _version, ntime) = recv.recv().await.unwrap(); - if cloned.clone().safe_lock(|c| c.status).unwrap() != ClientStatus::Subscribed { - continue; - } - let extra_nonce2: Extranonce = - vec![0; cloned.safe_lock(|c| c.extranonce2_size.unwrap()).unwrap()] - .try_into() - .unwrap(); - let submit = client_to_server::Submit { - id: 0, - user_name: "user".into(), // TODO: user name should NOT be hardcoded - job_id: job_id.to_string(), - extra_nonce2, - time: HexU32Be(ntime), - nonce: HexU32Be(nonce), - version_bits: None, - }; - let message: json_rpc::Message = submit.into(); - let message = format!("{}\n", serde_json::to_string(&message).unwrap()); - sender_outgoing_clone.send(message).await.unwrap(); - } + tokio::select!( + _ = recv_stop_miner.changed() => { + warn!("Stopping miner"); + return; + }, + _ = async { + let recv = receiver_share.clone(); + loop { + let (nonce, job_id, _version, ntime) = recv.recv().await.unwrap(); + if cloned.clone().safe_lock(|c| c.status).unwrap() != ClientStatus::Subscribed { + continue; + } + let extra_nonce2: Extranonce = + vec![0; cloned.safe_lock(|c| c.extranonce2_size.unwrap()).unwrap()] + .try_into() + .unwrap(); + let submit = client_to_server::Submit { + id: 0, + user_name: "user".into(), // TODO: user name should NOT be hardcoded + job_id: job_id.to_string(), + extra_nonce2, + time: HexU32Be(ntime), + nonce: HexU32Be(nonce), + version_bits: None, + }; + let message: json_rpc::Message = submit.into(); + let message = format!("{}\n", serde_json::to_string(&message).unwrap()); + sender_outgoing_clone.send(message).await.unwrap(); + } + } => {} + ) }); let recv_incoming = client.safe_lock(|c| c.receiver_incoming.clone()).unwrap(); @@ -226,8 +259,11 @@ impl Client { // Waits for the `sender_incoming` to get message line from socket to be parsed by the // `Client` loop { - let incoming = recv_incoming.recv().await.unwrap(); - Self::parse_message(client.clone(), Ok(incoming)).await; + if let Ok(incoming) = recv_incoming.clone().recv().await { + Self::parse_message(client.clone(), Ok(incoming)).await; + } else { + warn!("Error reading from socket via `recv_incoming` channel") + } } } diff --git a/roles/test-utils/mining-device-sv1/src/main.rs b/roles/test-utils/mining-device-sv1/src/main.rs index 0c831227cf..ad41cf495c 100644 --- a/roles/test-utils/mining-device-sv1/src/main.rs +++ b/roles/test-utils/mining-device-sv1/src/main.rs @@ -13,6 +13,7 @@ async fn main() { Client::connect( 80, SocketAddr::from_str(ADDR).expect("Invalid upstream address"), + false, ) .await } diff --git a/roles/tests-integration/lib/mod.rs b/roles/tests-integration/lib/mod.rs index c6cb7354c6..979ce0550e 100644 --- a/roles/tests-integration/lib/mod.rs +++ b/roles/tests-integration/lib/mod.rs @@ -281,9 +281,9 @@ pub fn measure_hashrate(duration_secs: u64) -> f64 { hashes as f64 / elapsed_secs } -pub async fn start_mining_device_sv1(upstream_addr: SocketAddr) { +pub async fn start_mining_device_sv1(upstream_addr: SocketAddr, single_submit: bool) { tokio::spawn(async move { - mining_device_sv1::client::Client::connect(80, upstream_addr).await; + mining_device_sv1::client::Client::connect(80, upstream_addr, single_submit).await; }); tokio::time::sleep(std::time::Duration::from_secs(3)).await; }