Skip to content

Commit

Permalink
Add back connection progress indication
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolaschan committed Mar 23, 2024
1 parent 7cbb0f4 commit ac6030e
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 32 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[target.x86_64-unknown-linux-musl]
linker = "musl-gcc"

[dependencies]
async-trait = "0.1.77"
bincode = "1.3.3"
Expand Down Expand Up @@ -43,3 +40,4 @@ futures = "0.3.30"
tor-proto = "0.16.1"
tor-cell = "0.16.1"
http = "1.1.0"
itertools = "0.12.1"
60 changes: 33 additions & 27 deletions src/managed_peer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::{sync::{
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
}};
};

use insanity_tui::{AppEvent, Peer, PeerState};
use itertools::Itertools;
use tokio::sync::{broadcast, mpsc};
use veq::veq::VeqSocket;

Expand Down Expand Up @@ -51,7 +52,9 @@ impl ManagedPeer {
pub async fn set_denoise(&self, denoise: bool) {
self.denoise.store(denoise, Ordering::Relaxed);
if let Some(sender) = &self.ui_sender {
sender.send(AppEvent::SetPeerDenoise(self.address.to_string(), denoise)).unwrap();
sender
.send(AppEvent::SetPeerDenoise(self.address.to_string(), denoise))
.unwrap();
}
}

Expand All @@ -61,7 +64,9 @@ impl ManagedPeer {
let mut volume_guard = self.volume.lock().unwrap();
*volume_guard = volume;
if let Some(sender) = ui_sender {
sender.send(AppEvent::SetPeerVolume(address, volume)).unwrap();
sender
.send(AppEvent::SetPeerVolume(address, volume))
.unwrap();
}
}

Expand All @@ -80,7 +85,7 @@ impl ManagedPeer {
let denoise = self.denoise.clone();
let volume = self.volume.clone();
let display_name = conn_manager.cached_display_name(&address);

let peer = Peer::new(
address.to_string(),
display_name,
Expand All @@ -92,7 +97,7 @@ impl ManagedPeer {
if let Some(sender) = &self.ui_sender {
sender.send(AppEvent::AddPeer(peer.clone())).unwrap();
}

let socket = self.socket.clone();

let mut shutdown_rx = self.shutdown_tx.subscribe();
Expand Down Expand Up @@ -127,7 +132,8 @@ impl ManagedPeer {
}
self.enabled.store(false, Ordering::Relaxed);
if let Some(sender) = &self.ui_sender {
sender.send(AppEvent::AddPeer(Peer::new(
sender
.send(AppEvent::AddPeer(Peer::new(
self.address.to_string(),
self.conn_manager.cached_display_name(&self.address),
PeerState::Disabled,
Expand All @@ -152,26 +158,26 @@ async fn connect(
log::info!("Connecting to peer {:?}", address);
if let Some((session, info)) = tokio::select! {
res = conn_manager.session(&mut socket, &address) => res,
// _x = async {
// if let Some(ref sender) = ui_sender {
// let mut index = 0;
// loop {
// if let Some(cached_peer_info) = conn_manager.cached_peer_info(&address) {
// let ip_addresses_sorted = cached_peer_info.conn_info.addresses.iter().sorted().collect::<Vec<_>>();
// let ip_address = ip_addresses_sorted.get(index).map(|x| x.to_string()).unwrap_or("".to_string());
// sender.send(AppEvent::AddPeer(Peer::new(
// address.clone().to_string(),
// Some(cached_peer_info.display_name.clone()),
// PeerState::Connecting(ip_address),
// denoise.load(Ordering::Relaxed),
// *volume.lock().unwrap(),
// ))).unwrap();
// index = (index + 1) % ip_addresses_sorted.len();
// }
// tokio::time::sleep(std::time::Duration::from_millis(50)).await;
// }
// }
// } => { return; },
_x = async {
if let Some(ref sender) = ui_sender {
let mut index = 0;
loop {
if let Some(cached_peer_info) = conn_manager.cached_peer_info(&address) {
let ip_addresses_sorted = cached_peer_info.conn_info.addresses.iter().sorted().collect::<Vec<_>>();
let ip_address = ip_addresses_sorted.get(index).map(|x| x.to_string()).unwrap_or("".to_string());
sender.send(AppEvent::AddPeer(Peer::new(
address.clone().to_string(),
Some(cached_peer_info.display_name.clone()),
PeerState::Connecting(ip_address),
denoise.load(Ordering::Relaxed),
*volume.lock().unwrap(),
))).unwrap();
index = (index + 1) % ip_addresses_sorted.len();
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
}
} => { return; },
_ = shutdown_receiver.recv() => { return; }
} {
log::info!("Connected to peer {:?}", address);
Expand Down
4 changes: 2 additions & 2 deletions src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ async fn wait_for_peer_info(sidechannel: &mut OnionSidechannel) -> AugmentedInfo
loop {
match sidechannel.peer_info().await {
Ok(info) => return info,
Err(_e) => {
log::debug!("Error receving peer info.");
Err(e) => {
log::debug!("Error receving peer info: {}", e);
}
}
}
Expand Down

0 comments on commit ac6030e

Please sign in to comment.