diff --git a/lib/grammers-client/examples/echo.rs b/lib/grammers-client/examples/echo.rs index a42760c6..dd60e3d7 100644 --- a/lib/grammers-client/examples/echo.rs +++ b/lib/grammers-client/examples/echo.rs @@ -13,10 +13,7 @@ use std::env; use std::pin::pin; -use futures::{ - future::{select, Either}, - StreamExt, -}; +use futures::future::{select, Either}; use simple_logger::SimpleLogger; use tokio::{runtime, task}; @@ -74,8 +71,6 @@ async fn async_main() -> Result { println!("Signed in!"); } - let mut update_stream = client.update_stream(); - println!("Waiting for messages..."); // This code uses `select` on Ctrl+C to gracefully stop the client and have a chance to @@ -87,7 +82,7 @@ async fn async_main() -> Result { // so a manual `select` is used instead by pinning async blocks by hand. loop { let exit = pin!(async { tokio::signal::ctrl_c().await }); - let upd = pin!(async { update_stream.select_next_some().await }); + let upd = pin!(async { client.next_update().await }); let update = match select(exit, upd).await { Either::Left(_) => break, diff --git a/lib/grammers-client/examples/inline-pagination.rs b/lib/grammers-client/examples/inline-pagination.rs index 8afbd41b..f87b278f 100644 --- a/lib/grammers-client/examples/inline-pagination.rs +++ b/lib/grammers-client/examples/inline-pagination.rs @@ -25,10 +25,7 @@ use std::env; use std::pin::pin; -use futures::{ - future::{select, Either}, - StreamExt, -}; +use futures::future::{select, Either}; use simple_logger::SimpleLogger; use tokio::{runtime, task}; @@ -137,12 +134,10 @@ async fn async_main() -> Result { println!("Signed in!"); } - let mut update_stream = client.update_stream(); - println!("Waiting for messages..."); loop { let exit = pin!(async { tokio::signal::ctrl_c().await }); - let upd = pin!(async { update_stream.select_next_some().await }); + let upd = pin!(async { client.next_update().await }); let update = match select(exit, upd).await { Either::Left(_) => { diff --git a/lib/grammers-client/examples/reconnection.rs b/lib/grammers-client/examples/reconnection.rs index 95da5e4f..08a36304 100644 --- a/lib/grammers-client/examples/reconnection.rs +++ b/lib/grammers-client/examples/reconnection.rs @@ -1,6 +1,5 @@ //! this example demonstrate how to implement custom Reconnection Polies -use futures::TryStreamExt; use grammers_client::session::Session; use grammers_client::{Client, Config, InitParams, ReconnectionPolicy}; use std::ops::ControlFlow; @@ -43,19 +42,16 @@ async fn async_main() -> Result { /// happy listening to updates forever!! use grammers_client::Update; - client - .update_stream() - .try_for_each_concurrent(None, |update| async { - match update { - Update::NewMessage(message) if !message.outgoing() => { - message.respond(message.text()).await.map(|_| ()) - } - _ => Ok(()), - } - }) - .await?; + loop { + let update = client.next_update().await?; - Ok(()) + match update { + Update::NewMessage(message) if !message.outgoing() => { + message.respond(message.text()).await?; + } + _ => {} + } + } } fn main() -> Result { diff --git a/lib/grammers-client/src/client/updates.rs b/lib/grammers-client/src/client/updates.rs index 92745248..47f98d4d 100644 --- a/lib/grammers-client/src/client/updates.rs +++ b/lib/grammers-client/src/client/updates.rs @@ -10,17 +10,13 @@ use super::Client; use crate::types::{ChatMap, Update}; -use futures::stream::FusedStream; -use futures::Stream; -use futures_util::future::{select, Either}; +use futures::future::{select, Either}; pub use grammers_mtsender::{AuthorizationError, InvocationError}; use grammers_session::channel_id; pub use grammers_session::{PrematureEndReason, UpdateState}; use grammers_tl_types as tl; -use std::future::Future; use std::pin::pin; use std::sync::Arc; -use std::task::Poll; use std::time::{Duration, Instant}; use tokio::time::sleep_until; @@ -49,94 +45,16 @@ impl Client { /// # Ok(()) /// # } /// ``` - pub fn update_stream(&self) -> UpdateStream<'_> { - UpdateStream { client: self } - } - - pub(crate) fn process_socket_updates(&self, all_updates: Vec) { - if all_updates.is_empty() { - return; - } - - let mut result = Option::<(Vec<_>, Vec<_>, Vec<_>)>::None; - { - let state = &mut *self.0.state.write().unwrap(); - - for updates in all_updates { - if state - .message_box - .ensure_known_peer_hashes(&updates, &mut state.chat_hashes) - .is_err() - { - continue; - } - match state - .message_box - .process_updates(updates, &state.chat_hashes) - { - Ok(tup) => { - if let Some(res) = result.as_mut() { - res.0.extend(tup.0); - res.1.extend(tup.1); - res.2.extend(tup.2); - } else { - result = Some(tup); - } - } - Err(_) => return, - } - } - } - - if let Some((updates, users, chats)) = result { - self.extend_update_queue(updates, ChatMap::new(users, chats)); - } - } - - fn extend_update_queue(&self, mut updates: Vec, chat_map: Arc) { - let mut state = self.0.state.write().unwrap(); - - if let Some(limit) = self.0.config.params.update_queue_limit { - if let Some(exceeds) = (state.updates.len() + updates.len()).checked_sub(limit + 1) { - let exceeds = exceeds + 1; - let now = Instant::now(); - let notify = match state.last_update_limit_warn { - None => true, - Some(instant) => now - instant > UPDATE_LIMIT_EXCEEDED_LOG_COOLDOWN, - }; - - updates.truncate(updates.len() - exceeds); - if notify { - log::warn!( - "{} updates were dropped because the update_queue_limit was exceeded", - exceeds - ); - } + pub async fn next_update(&self) -> Result { + loop { + let (update, chats) = self.next_raw_update().await?; - state.last_update_limit_warn = Some(now); + if let Some(update) = Update::new(self, update, &chats) { + return Ok(update); } } - - state - .updates - .extend(updates.into_iter().map(|u| (u, chat_map.clone()))); } - /// Synchronize the updates state to the session. - pub fn sync_update_state(&self) { - let state = self.0.state.read().unwrap(); - self.0 - .config - .session - .set_state(state.message_box.session_state()); - } -} - -pub struct UpdateStream<'a> { - client: &'a Client, -} - -impl<'a> UpdateStream<'a> { /// Returns the next raw update and associated chat map from the buffer where they are queued until used. /// /// # Example @@ -161,7 +79,7 @@ impl<'a> UpdateStream<'a> { ) -> Result<(tl::enums::Update, Arc), InvocationError> { loop { let (deadline, get_diff, get_channel_diff) = { - let state = &mut *self.client.0.state.write().unwrap(); + let state = &mut *self.0.state.write().unwrap(); if let Some(update) = state.updates.pop_front() { return Ok(update); } @@ -173,20 +91,19 @@ impl<'a> UpdateStream<'a> { }; if let Some(request) = get_diff { - let response = self.client.invoke(&request).await?; + let response = self.invoke(&request).await?; let (updates, users, chats) = { - let state = &mut *self.client.0.state.write().unwrap(); + let state = &mut *self.0.state.write().unwrap(); state .message_box .apply_difference(response, &mut state.chat_hashes) }; - self.client - .extend_update_queue(updates, ChatMap::new(users, chats)); + self.extend_update_queue(updates, ChatMap::new(users, chats)); continue; } if let Some(request) = get_channel_diff { - let maybe_response = self.client.invoke(&request).await; + let maybe_response = self.invoke(&request).await; let response = match maybe_response { Ok(r) => r, @@ -204,8 +121,7 @@ impl<'a> UpdateStream<'a> { // Instead we manually extract the previously-known pts and use that. log::warn!("Getting difference for channel updates caused PersistentTimestampOutdated; ending getting difference prematurely until server issues are resolved"); { - self.client - .0 + self.0 .state .write() .unwrap() @@ -225,8 +141,7 @@ impl<'a> UpdateStream<'a> { .unwrap_or_else(|| "empty channel".into()) ); { - self.client - .0 + self.0 .state .write() .unwrap() @@ -238,8 +153,7 @@ impl<'a> UpdateStream<'a> { Err(InvocationError::Rpc(rpc_error)) if rpc_error.code == 500 => { log::warn!("Telegram is having internal issues: {:#?}", rpc_error); { - self.client - .0 + self.0 .state .write() .unwrap() @@ -255,7 +169,7 @@ impl<'a> UpdateStream<'a> { }; let (updates, users, chats) = { - let state = &mut *self.client.0.state.write().unwrap(); + let state = &mut *self.0.state.write().unwrap(); state.message_box.apply_channel_difference( request, response, @@ -263,13 +177,12 @@ impl<'a> UpdateStream<'a> { ) }; - self.client - .extend_update_queue(updates, ChatMap::new(users, chats)); + self.extend_update_queue(updates, ChatMap::new(users, chats)); continue; } let sleep = pin!(async { sleep_until(deadline.into()).await }); - let step = pin!(async { self.client.step().await }); + let step = pin!(async { self.step().await }); match select(sleep, step).await { Either::Left(_) => {} @@ -277,38 +190,83 @@ impl<'a> UpdateStream<'a> { } } } -} -impl<'a> Stream for UpdateStream<'a> { - type Item = Result; + pub(crate) fn process_socket_updates(&self, all_updates: Vec) { + if all_updates.is_empty() { + return; + } - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - loop { - let (update, chats) = { - let this = self.next_raw_update(); - futures::pin_mut!(this); - match futures::ready!(this.poll(cx)) { - Ok(update) => update, - Err(e) => return Poll::Ready(Some(Err(e))), + let mut result = Option::<(Vec<_>, Vec<_>, Vec<_>)>::None; + { + let state = &mut *self.0.state.write().unwrap(); + + for updates in all_updates { + if state + .message_box + .ensure_known_peer_hashes(&updates, &mut state.chat_hashes) + .is_err() + { + continue; + } + match state + .message_box + .process_updates(updates, &state.chat_hashes) + { + Ok(tup) => { + if let Some(res) = result.as_mut() { + res.0.extend(tup.0); + res.1.extend(tup.1); + res.2.extend(tup.2); + } else { + result = Some(tup); + } + } + Err(_) => return, + } + } + } + + if let Some((updates, users, chats)) = result { + self.extend_update_queue(updates, ChatMap::new(users, chats)); + } + } + + fn extend_update_queue(&self, mut updates: Vec, chat_map: Arc) { + let mut state = self.0.state.write().unwrap(); + + if let Some(limit) = self.0.config.params.update_queue_limit { + if let Some(exceeds) = (state.updates.len() + updates.len()).checked_sub(limit + 1) { + let exceeds = exceeds + 1; + let now = Instant::now(); + let notify = match state.last_update_limit_warn { + None => true, + Some(instant) => now - instant > UPDATE_LIMIT_EXCEEDED_LOG_COOLDOWN, + }; + + updates.truncate(updates.len() - exceeds); + if notify { + log::warn!( + "{} updates were dropped because the update_queue_limit was exceeded", + exceeds + ); } - }; - if let Some(update) = Update::new(self.client, update, &chats) { - return Poll::Ready(Some(Ok(update))); + state.last_update_limit_warn = Some(now); } } + + state + .updates + .extend(updates.into_iter().map(|u| (u, chat_map.clone()))); } -} -impl<'a> FusedStream for UpdateStream<'a> { - fn is_terminated(&self) -> bool { - // The update stream is a continuous flow of updates. - // As a long-running stream, it never reaches a - // terminated state, hence we always return false. - false + /// Synchronize the updates state to the session. + pub fn sync_update_state(&self) { + let state = self.0.state.read().unwrap(); + self.0 + .config + .session + .set_state(state.message_box.session_state()); } } @@ -323,12 +281,10 @@ mod tests { #[test] fn ensure_next_update_future_impls_send() { - use futures::TryStreamExt; - if false { // We just want it to type-check, not actually run. fn typeck(_: impl Future + Send) {} - typeck(get_client().update_stream().try_next()); + typeck(get_client().next_update()); } } }