From f824796424da0569cb0a639e49e035835df49822 Mon Sep 17 00:00:00 2001 From: er-azh <80633916+er-azh@users.noreply.github.com> Date: Sun, 22 Dec 2024 08:36:44 -0500 Subject: [PATCH] Implement web-friendly versions of `sleep` and `sleep_until` --- lib/grammers-client/src/client/net.rs | 6 +- lib/grammers-client/src/client/updates.rs | 2 +- lib/grammers-client/src/types/action.rs | 3 +- lib/grammers-mtsender/Cargo.toml | 4 + lib/grammers-mtsender/DEPS.md | 10 +++ lib/grammers-mtsender/src/lib.rs | 7 +- lib/grammers-mtsender/src/utils.rs | 93 +++++++++++++++++++++++ 7 files changed, 118 insertions(+), 7 deletions(-) create mode 100644 lib/grammers-mtsender/src/utils.rs diff --git a/lib/grammers-client/src/client/net.rs b/lib/grammers-client/src/client/net.rs index 3d574e7d..1de76749 100644 --- a/lib/grammers-client/src/client/net.rs +++ b/lib/grammers-client/src/client/net.rs @@ -10,7 +10,9 @@ use super::{Client, ClientInner, Config}; use crate::utils; use grammers_mtproto::mtp; use grammers_mtproto::transport; -use grammers_mtsender::{self as sender, AuthorizationError, InvocationError, RpcError, Sender}; +use grammers_mtsender::{ + self as sender, utils::sleep, AuthorizationError, InvocationError, RpcError, Sender, +}; use grammers_session::{ChatHashCache, MessageBox}; use grammers_tl_types::{self as tl, Deserializable}; use log::{debug, info}; @@ -397,7 +399,7 @@ impl Connection { delay, std::any::type_name::() ); - tokio::time::sleep(delay).await; + sleep(delay).await; slept_flood = true; rx = self.request_tx.read().unwrap().enqueue(request); continue; diff --git a/lib/grammers-client/src/client/updates.rs b/lib/grammers-client/src/client/updates.rs index 4824dd2b..d64aa1e8 100644 --- a/lib/grammers-client/src/client/updates.rs +++ b/lib/grammers-client/src/client/updates.rs @@ -11,13 +11,13 @@ use super::Client; use crate::types::{ChatMap, Update}; use futures::future::{select, Either}; +use grammers_mtsender::utils::sleep_until; pub use grammers_mtsender::{AuthorizationError, InvocationError}; use grammers_session::channel_id; pub use grammers_session::{PrematureEndReason, UpdateState}; use grammers_tl_types as tl; use std::pin::pin; use std::sync::Arc; -use tokio::time::sleep_until; use std::time::Duration; use web_time::Instant; diff --git a/lib/grammers-client/src/types/action.rs b/lib/grammers-client/src/types/action.rs index e56de9cd..690e709d 100644 --- a/lib/grammers-client/src/types/action.rs +++ b/lib/grammers-client/src/types/action.rs @@ -6,6 +6,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. use futures::future::Either; +use grammers_mtsender::utils; use grammers_mtsender::InvocationError; use grammers_session::PackedChat; use grammers_tl_types as tl; @@ -118,7 +119,7 @@ impl ActionSender { let action = async { request_result = self.oneshot(action().into()).await; - tokio::time::sleep(self.repeat_delay).await; + utils::sleep(self.repeat_delay).await; }; tokio::pin!(action); diff --git a/lib/grammers-mtsender/Cargo.toml b/lib/grammers-mtsender/Cargo.toml index 7eb773a4..0dd45f34 100644 --- a/lib/grammers-mtsender/Cargo.toml +++ b/lib/grammers-mtsender/Cargo.toml @@ -31,6 +31,10 @@ hickory-resolver = { version = "0.24.1", optional = true } url = { version = "2.5.2", optional = true } web-time = "1.1.0" +[target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dependencies] +wasm-bindgen-futures = "0.4.49" +web-sys = {version = "0.3.76", features = ["Window"]} + [dev-dependencies] simple_logger = { version = "5.0.0", default-features = false, features = ["colors"] } tokio = { version = "1.40.0", features = ["rt"] } diff --git a/lib/grammers-mtsender/DEPS.md b/lib/grammers-mtsender/DEPS.md index 48cbc268..618596a4 100644 --- a/lib/grammers-mtsender/DEPS.md +++ b/lib/grammers-mtsender/DEPS.md @@ -54,3 +54,13 @@ SOCKS5 proxy support. Used for its web-friendly clock and timer as a replacement for `std::time` in the library. Automatically falls back to `std::time` when we're not targeting web. + +## web-sys + +Only used when targeting `wasm32-unknown-unknown`. Used by the `Timeout` implementation to +call `setTimeout` and `clearTimeout` in the browser. + +## wasm-bindgen-futures + +Only used when targeting `wasm32-unknown-unknown`. Used by the `Timeout` implementation to +convert a `Promise` into a `Future`. diff --git a/lib/grammers-mtsender/src/lib.rs b/lib/grammers-mtsender/src/lib.rs index 70c1100f..24c74435 100644 --- a/lib/grammers-mtsender/src/lib.rs +++ b/lib/grammers-mtsender/src/lib.rs @@ -10,6 +10,7 @@ mod errors; mod reconnection; +pub mod utils; pub use crate::reconnection::*; pub use errors::{AuthorizationError, InvocationError, ReadError, RpcError}; @@ -35,7 +36,7 @@ use tokio::net::TcpStream; use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::sync::oneshot::error::TryRecvError; -use tokio::time::sleep_until; +use utils::{sleep, sleep_until}; use web_time::{Instant, SystemTime}; #[cfg(feature = "proxy")] @@ -387,7 +388,7 @@ impl Sender { Err(e) => { attempts += 1; log::warn!("auto-reconnect failed {} time(s): {}", attempts, e); - tokio::time::sleep(Duration::from_secs(1)).await; + sleep(Duration::from_secs(1)).await; match self.reconnection_policy.should_retry(attempts) { ControlFlow::Break(_) => { @@ -398,7 +399,7 @@ impl Sender { return Err(e); } ControlFlow::Continue(duration) => { - tokio::time::sleep(duration).await; + sleep(duration).await; } } } diff --git a/lib/grammers-mtsender/src/utils.rs b/lib/grammers-mtsender/src/utils.rs new file mode 100644 index 00000000..4b308392 --- /dev/null +++ b/lib/grammers-mtsender/src/utils.rs @@ -0,0 +1,93 @@ +// Copyright 2020 - developers of the `grammers` project. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::time::Duration; +use web_time::Instant; + +/// A cancellable timeout for web platforms. +/// It is simply a wrapper around `window.setTimeout` but also makes +/// sure to clear the timeout when dropped to avoid leaking timers. +#[cfg(all(target_arch = "wasm32", target_os = "unknown"))] +pub struct Timeout { + handle: std::cell::OnceCell, + inner: wasm_bindgen_futures::JsFuture, +} + +#[cfg(all(target_arch = "wasm32", target_os = "unknown"))] +impl std::future::Future for Timeout { + type Output = (); + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + std::pin::Pin::new(&mut self.get_mut().inner) + .poll(cx) + .map(|_| ()) + } +} + +#[cfg(all(target_arch = "wasm32", target_os = "unknown"))] +impl Drop for Timeout { + fn drop(&mut self) { + if let Some(handle) = self.handle.get() { + web_sys::window() + .unwrap() + .clear_timeout_with_handle(*handle); + } + } +} + +#[cfg(all(target_arch = "wasm32", target_os = "unknown"))] +impl Timeout { + pub fn new(duration: Duration) -> Self { + use wasm_bindgen_futures::js_sys; + + let handle = std::cell::OnceCell::new(); + let mut cb = |resolve: js_sys::Function, _reject: js_sys::Function| { + handle + .set( + web_sys::window() + .unwrap() + .set_timeout_with_callback_and_timeout_and_arguments_0( + &resolve, + duration.as_millis() as i32, + ) + .unwrap(), + ) + .expect("timeout already set"); + }; + + let inner = wasm_bindgen_futures::JsFuture::from(js_sys::Promise::new(&mut cb)); + Self { handle, inner } + } +} + +/// a web-friendly version of `tokio::time::sleep` +pub async fn sleep(duration: Duration) { + #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] + { + return tokio::time::sleep(duration).await; + } + #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] + { + Timeout::new(duration).await; + } +} + +/// a web-friendly version of `tokio::time::sleep_until` +pub async fn sleep_until(deadline: Instant) { + #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] + { + return tokio::time::sleep_until(deadline.into()).await; + } + #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] + { + Timeout::new(deadline - Instant::now()).await; + } +}