Skip to content

Commit

Permalink
Add option to stop miner after submitting a single
Browse files Browse the repository at this point in the history
share
  • Loading branch information
jbesraa committed Feb 14, 2025
1 parent 68b388c commit 529267a
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 32 deletions.
75 changes: 46 additions & 29 deletions roles/test-utils/mining-device-sv1/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl Client {
/// the information from `sender_share`, it is formatted as a `v1::client_to_server::Submit`
/// and then serialized into a json message that is sent to the Upstream via
/// `sender_outgoing`.
pub async fn connect(client_id: u32, upstream_addr: SocketAddr) {
pub async fn connect(client_id: u32, upstream_addr: SocketAddr, single_submit: bool) {
let stream = TcpStream::connect(upstream_addr).await.unwrap();
let (reader, mut writer) = stream.into_split();

Expand All @@ -86,6 +86,7 @@ impl Client {
// Upstream via `sender_outgoing`
let (sender_share, receiver_share) = unbounded();

let (send_stop_submitting, mut recv_stop_submitting) = tokio::sync::watch::channel(false);
// Instantiates a new `Miner` (a mock of an actual Mining Device) with a job id of 0.
let miner = Arc::new(Mutex::new(Miner::new(0)));

Expand Down Expand Up @@ -128,6 +129,9 @@ impl Client {
loop {
let message: String = receiver_outgoing.recv().await.unwrap();
(writer).write_all(message.as_bytes()).await.unwrap();
if message.contains("mining.submit") && single_submit {
send_stop_submitting.send(true).unwrap();
}
}
});

Expand Down Expand Up @@ -170,43 +174,53 @@ impl Client {
// Sends relevant candidate block header values needed to construct a
// `mining.submit` message to the `receiver_share` in the task that is responsible
// for sending messages to the Upstream node.
sender_share
if sender_share
.try_send((nonce, job_id.unwrap(), version.unwrap(), time))
.unwrap();
.is_err()
{
warn!("Share channel is not available");
break;
}
}
miner_cloned
.safe_lock(|m| m.header.as_mut().map(|h| h.nonce += 1))
.unwrap();
});

// Task to receive relevant candidate block header values needed to construct a
// `mining.submit` message. This message is contructed as a `client_to_server::Submit` and
// then serialized into json to be sent to the Upstream via the `sender_outgoing` sender.
let cloned = client.clone();
task::spawn(async move {
let recv = receiver_share.clone();
loop {
let (nonce, job_id, _version, ntime) = recv.recv().await.unwrap();
if cloned.clone().safe_lock(|c| c.status).unwrap() != ClientStatus::Subscribed {
continue;
}
let extra_nonce2: Extranonce =
vec![0; cloned.safe_lock(|c| c.extranonce2_size.unwrap()).unwrap()]
.try_into()
.unwrap();
let submit = client_to_server::Submit {
id: 0,
user_name: "user".into(), // TODO: user name should NOT be hardcoded
job_id: job_id.to_string(),
extra_nonce2,
time: HexU32Be(ntime),
nonce: HexU32Be(nonce),
version_bits: None,
};
let message: json_rpc::Message = submit.into();
let message = format!("{}\n", serde_json::to_string(&message).unwrap());
sender_outgoing_clone.send(message).await.unwrap();
}
tokio::select!(
_ = recv_stop_submitting.changed() => {
warn!("Stopping miner")
},
_ = async {
let recv = receiver_share.clone();
loop {
let (nonce, job_id, _version, ntime) = recv.recv().await.unwrap();
if cloned.clone().safe_lock(|c| c.status).unwrap() != ClientStatus::Subscribed {
continue;
}
let extra_nonce2: Extranonce =
vec![0; cloned.safe_lock(|c| c.extranonce2_size.unwrap()).unwrap()]
.try_into()
.unwrap();
let submit = client_to_server::Submit {
id: 0,
user_name: "user".into(), // TODO: user name should NOT be hardcoded
job_id: job_id.to_string(),
extra_nonce2,
time: HexU32Be(ntime),
nonce: HexU32Be(nonce),
version_bits: None,
};
let message: json_rpc::Message = submit.into();
let message = format!("{}\n", serde_json::to_string(&message).unwrap());
sender_outgoing_clone.send(message).await.unwrap();
}
} => {}
)
});
let recv_incoming = client.safe_lock(|c| c.receiver_incoming.clone()).unwrap();

Expand All @@ -226,8 +240,11 @@ impl Client {
// Waits for the `sender_incoming` to get message line from socket to be parsed by the
// `Client`
loop {
let incoming = recv_incoming.recv().await.unwrap();
Self::parse_message(client.clone(), Ok(incoming)).await;
if let Ok(incoming) = recv_incoming.clone().recv().await {
Self::parse_message(client.clone(), Ok(incoming)).await;
} else {
warn!("Error reading from socket via `recv_incoming` channel")
}
}
}

Expand Down
1 change: 1 addition & 0 deletions roles/test-utils/mining-device-sv1/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ async fn main() {
Client::connect(
80,
SocketAddr::from_str(ADDR).expect("Invalid upstream address"),
false,
)
.await
}
4 changes: 2 additions & 2 deletions roles/tests-integration/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,9 @@ pub fn measure_hashrate(duration_secs: u64) -> f64 {
hashes as f64 / elapsed_secs
}

pub async fn start_mining_device_sv1(upstream_addr: SocketAddr) {
pub async fn start_mining_device_sv1(upstream_addr: SocketAddr, single_submit: bool) {
tokio::spawn(async move {
mining_device_sv1::client::Client::connect(80, upstream_addr).await;
mining_device_sv1::client::Client::connect(80, upstream_addr, single_submit).await;
});
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
}
Expand Down
2 changes: 1 addition & 1 deletion roles/tests-integration/tests/translator_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ async fn translation_proxy() {
let (pool_translator_sniffer, pool_translator_sniffer_addr) =
start_sniffer("0".to_string(), pool_addr, false, None).await;
let (_, tproxy_addr) = start_sv2_translator(pool_translator_sniffer_addr).await;
let _mining_device = start_mining_device_sv1(tproxy_addr).await;
let _mining_device = start_mining_device_sv1(tproxy_addr, false).await;
pool_translator_sniffer
.wait_for_message_type(MessageDirection::ToUpstream, MESSAGE_TYPE_SETUP_CONNECTION)
.await;
Expand Down

0 comments on commit 529267a

Please sign in to comment.