Skip to content

Commit

Permalink
feat: deprecate KeepAlive in favor of connection keepalives
Browse files Browse the repository at this point in the history
The connection-level pings introduced in
95ddb3d are a better mechanism, and
apply to any stream.
  • Loading branch information
colinmarc committed Feb 5, 2025
1 parent b3b3194 commit ad3cdca
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 40 deletions.
28 changes: 1 addition & 27 deletions mm-client-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,20 +466,9 @@ async fn spawn_conn(
// Spawn a second thread to fulfill request/response futures and drive
// the attachment delegates.

let outgoing_clone = outgoing_tx.clone();
let waker_clone = waker.clone();
let _ = std::thread::Builder::new()
.name("mmclient reactor".to_string())
.spawn(move || {
conn_reactor(
incoming_rx,
outgoing_clone,
waker_clone,
roundtrips_rx,
attachments_rx,
client,
)
})
.spawn(move || conn_reactor(incoming_rx, roundtrips_rx, attachments_rx, client))
.unwrap();

if ready_rx.await.is_err() {
Expand Down Expand Up @@ -509,8 +498,6 @@ struct InFlight {

fn conn_reactor(
incoming: flume::Receiver<conn::ConnEvent>,
outgoing: flume::Sender<conn::OutgoingMessage>,
conn_waker: Arc<mio::Waker>,
roundtrips: flume::Receiver<(u64, Roundtrip)>,
attachments: flume::Receiver<(u64, AttachmentState)>,
client: Arc<AsyncMutex<InnerClient>>,
Expand All @@ -537,19 +524,6 @@ fn conn_reactor(
let Roundtrip { tx, .. } = in_flight.roundtrips.remove(id).unwrap();
let _ = tx.send(Err(ClientError::RequestTimeout));
}

// Send keepalives.
if !in_flight.attachments.is_empty() {
for (sid, _) in in_flight.attachments.iter() {
let _ = outgoing.send(conn::OutgoingMessage {
sid: *sid,
msg: protocol::KeepAlive {}.into(),
fin: false,
});
}

let _ = conn_waker.wake();
}
}

enum SelectResult {
Expand Down
13 changes: 0 additions & 13 deletions mm-server/src/server/handlers/attachment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ struct AttachmentHandler<'a> {

last_video_frame_recvd: time::Instant,
last_audio_frame_recvd: time::Instant,
keepalive_timer: crossbeam_channel::Receiver<time::Instant>,

// For saving the bitstream to disk in bug reports.
bug_report_dir: Option<PathBuf>,
Expand All @@ -60,9 +59,6 @@ enum AttachmentError {
ServerError(ErrorCode, Option<String>),
}

/// How long to wait before kicking the client for inactivity.
const KEEPALIVE_TIMEOUT: time::Duration = time::Duration::from_secs(30);

pub fn attach(ctx: &super::Context, msg: protocol::Attach) -> Result<(), ServerError> {
let session_id = msg.session_id;
let handler = AttachmentHandler::new(ctx, msg)?;
Expand Down Expand Up @@ -182,7 +178,6 @@ impl<'a> AttachmentHandler<'a> {

let pointer_lock = None;

let keepalive_timer = crossbeam_channel::after(KEEPALIVE_TIMEOUT);
let now = time::Instant::now();

Ok(Self {
Expand All @@ -197,7 +192,6 @@ impl<'a> AttachmentHandler<'a> {

last_video_frame_recvd: now,
last_audio_frame_recvd: now,
keepalive_timer,

bug_report_dir,
bug_report_files: BTreeMap::default(),
Expand Down Expand Up @@ -229,9 +223,6 @@ impl<'a> AttachmentHandler<'a> {
recv(self.ctx.incoming) -> msg => {
match msg {
Ok(m) => {
// Reset timer.
self.keepalive_timer = crossbeam_channel::after(KEEPALIVE_TIMEOUT);

match self.handle_attachment_message(m) {
Ok(_) => (),
Err(AttachmentError::Finished) => return Ok(()),
Expand Down Expand Up @@ -267,10 +258,6 @@ impl<'a> AttachmentHandler<'a> {
}
}
},
recv(self.keepalive_timer) -> _ => {
debug!("client hung; ending attachment");
return Ok(());
}
}
}
}
Expand Down

0 comments on commit ad3cdca

Please sign in to comment.