From 79531588a54190d389415c58fddaac2148ede5f3 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Mon, 23 Dec 2024 18:04:20 +0100 Subject: [PATCH] Revert stream changes --- lib/grammers-client/Cargo.toml | 4 +- lib/grammers-client/DEPS.md | 4 +- lib/grammers-client/examples/dialogs.rs | 5 +- lib/grammers-client/examples/downloader.rs | 5 +- lib/grammers-client/examples/echo.rs | 10 +- .../examples/inline-pagination.rs | 10 +- lib/grammers-client/src/client/chats.rs | 174 +++++++----------- lib/grammers-client/src/client/dialogs.rs | 108 ++++------- lib/grammers-client/src/client/files.rs | 83 ++++----- lib/grammers-client/src/client/messages.rs | 162 ++++++---------- lib/grammers-client/src/client/updates.rs | 2 +- lib/grammers-client/src/lib.rs | 3 - lib/grammers-client/src/types/action.rs | 4 +- lib/grammers-client/src/types/chats.rs | 21 +-- lib/grammers-client/src/utils.rs | 20 -- 15 files changed, 220 insertions(+), 395 deletions(-) diff --git a/lib/grammers-client/Cargo.toml b/lib/grammers-client/Cargo.toml index 1111ac74..2f812a9a 100644 --- a/lib/grammers-client/Cargo.toml +++ b/lib/grammers-client/Cargo.toml @@ -24,7 +24,9 @@ default = ["fs"] [dependencies] chrono = "0.4.38" -futures = "0.3.31" +futures-util = { version = "0.3.30", default-features = false, features = [ + "alloc" +] } grammers-crypto = { path = "../grammers-crypto", version = "0.7.0" } grammers-mtproto = { path = "../grammers-mtproto", version = "0.7.0" } grammers-mtsender = { path = "../grammers-mtsender", version = "0.7.0" } diff --git a/lib/grammers-client/DEPS.md b/lib/grammers-client/DEPS.md index 1ec0597d..441e56fb 100644 --- a/lib/grammers-client/DEPS.md +++ b/lib/grammers-client/DEPS.md @@ -76,9 +76,9 @@ Used to test that this file lists all dependencies from `Cargo.toml`. Used for return custom types that `impl Future` so that the requests can be further configured without having to use `Box`. -## futures +## futures-util -Provides Stream functionality +Provides useful functions for working with futures/tasks. ## url diff --git a/lib/grammers-client/examples/dialogs.rs b/lib/grammers-client/examples/dialogs.rs index 805c508c..fa8f4a47 100644 --- a/lib/grammers-client/examples/dialogs.rs +++ b/lib/grammers-client/examples/dialogs.rs @@ -10,7 +10,6 @@ //! cargo run --example dialogs //! ``` -use futures::TryStreamExt; use grammers_client::session::Session; use grammers_client::{Client, Config, SignInError}; use simple_logger::SimpleLogger; @@ -89,10 +88,10 @@ async fn async_main() -> Result<()> { } } - let mut dialogs = client.stream_dialogs(); + let mut dialogs = client.iter_dialogs(); println!("Showing up to {} dialogs:", dialogs.total().await?); - while let Some(dialog) = dialogs.try_next().await? { + while let Some(dialog) = dialogs.next().await? { let chat = dialog.chat(); println!("- {: >10} {}", chat.id(), chat.name().unwrap_or_default()); } diff --git a/lib/grammers-client/examples/downloader.rs b/lib/grammers-client/examples/downloader.rs index a241a1c2..b098d361 100644 --- a/lib/grammers-client/examples/downloader.rs +++ b/lib/grammers-client/examples/downloader.rs @@ -17,7 +17,6 @@ use std::io::{BufRead, Write}; use std::path::Path; use std::{env, io}; -use futures::TryStreamExt; use grammers_client::{Client, Config, SignInError}; use mime::Mime; use mime_guess::mime; @@ -90,7 +89,7 @@ async fn async_main() -> Result<()> { let chat = maybe_chat.unwrap_or_else(|| panic!("Chat {chat_name} could not be found")); - let mut messages = client.stream_messages(&chat); + let mut messages = client.iter_messages(&chat); println!( "Chat {} has {} total messages.", @@ -100,7 +99,7 @@ async fn async_main() -> Result<()> { let mut counter = 0; - while let Some(msg) = messages.try_next().await? { + while let Some(msg) = messages.next().await? { counter += 1; println!("Message {}:{}", msg.id(), msg.text()); if let Some(media) = msg.media() { diff --git a/lib/grammers-client/examples/echo.rs b/lib/grammers-client/examples/echo.rs index dd60e3d7..7fd951d2 100644 --- a/lib/grammers-client/examples/echo.rs +++ b/lib/grammers-client/examples/echo.rs @@ -10,16 +10,14 @@ //! cargo run --example echo -- BOT_TOKEN //! ``` +use futures_util::future::{select, Either}; +use grammers_client::session::Session; +use grammers_client::{Client, Config, InitParams, Update}; +use simple_logger::SimpleLogger; use std::env; use std::pin::pin; - -use futures::future::{select, Either}; -use simple_logger::SimpleLogger; use tokio::{runtime, task}; -use grammers_client::session::Session; -use grammers_client::{Client, Config, InitParams, Update}; - type Result = std::result::Result<(), Box>; const SESSION_FILE: &str = "echo.session"; diff --git a/lib/grammers-client/examples/inline-pagination.rs b/lib/grammers-client/examples/inline-pagination.rs index f87b278f..bba622d4 100644 --- a/lib/grammers-client/examples/inline-pagination.rs +++ b/lib/grammers-client/examples/inline-pagination.rs @@ -22,16 +22,14 @@ //! how much data a button's payload can contain, and to keep it simple, we're storing it inline //! in decimal, so the numbers can't get too large). +use futures_util::future::{select, Either}; +use grammers_client::session::Session; +use grammers_client::{button, reply_markup, Client, Config, InputMessage, Update}; +use simple_logger::SimpleLogger; use std::env; use std::pin::pin; - -use futures::future::{select, Either}; -use simple_logger::SimpleLogger; use tokio::{runtime, task}; -use grammers_client::session::Session; -use grammers_client::{button, reply_markup, Client, Config, InputMessage, Update}; - type Result = std::result::Result<(), Box>; const SESSION_FILE: &str = "inline-pagination.session"; diff --git a/lib/grammers-client/src/client/chats.rs b/lib/grammers-client/src/client/chats.rs index 02bd1616..82f5a8c2 100644 --- a/lib/grammers-client/src/client/chats.rs +++ b/lib/grammers-client/src/client/chats.rs @@ -8,36 +8,25 @@ //! Methods related to users, groups and channels. +use super::Client; +use crate::types::{ + chats::AdminRightsBuilderInner, chats::BannedRightsBuilderInner, AdminRightsBuilder, + BannedRightsBuilder, Chat, ChatMap, IterBuffer, Message, Participant, Photo, User, +}; +use grammers_mtsender::RpcError; +pub use grammers_mtsender::{AuthorizationError, InvocationError}; +use grammers_session::{PackedChat, PackedType}; +use grammers_tl_types as tl; use std::collections::VecDeque; use std::future::Future; -use std::ops::DerefMut; -use std::pin::Pin; use std::sync::Arc; -use std::task::Poll; use std::time::Duration; -use futures::Stream; - -use grammers_mtsender::RpcError; -use grammers_session::{PackedChat, PackedType}; -use grammers_tl_types as tl; - -use super::Client; -use crate::{ - types::{ - chats::{AdminRightsBuilderInner, BannedRightsBuilderInner}, - AdminRightsBuilder, BannedRightsBuilder, Chat, ChatMap, IterBuffer, Message, Participant, - Photo, User, - }, - utils::poll_future_ready, -}; -pub use grammers_mtsender::{AuthorizationError, InvocationError}; - const MAX_PARTICIPANT_LIMIT: usize = 200; const MAX_PHOTO_LIMIT: usize = 100; const KICK_BAN_DURATION: i32 = 60; // in seconds, in case the second request fails -pub enum ParticipantStream { +pub enum ParticipantIter { Empty, Chat { client: Client, @@ -48,7 +37,7 @@ pub enum ParticipantStream { Channel(IterBuffer), } -impl ParticipantStream { +impl ParticipantIter { fn new(client: &Client, chat: PackedChat) -> Self { if let Some(channel) = chat.try_to_input_channel() { Self::Channel(IterBuffer::from_request( @@ -190,69 +179,58 @@ impl ParticipantStream { } } - /// apply a filter on fetched participants, note that this filter will apply only on large `Channel` and not small groups - pub fn filter_participants(mut self, filter: tl::enums::ChannelParticipantsFilter) -> Self { - match self { - ParticipantStream::Channel(ref mut c) => { - c.request.filter = filter; - self - } - _ => self, - } - } -} - -impl Stream for ParticipantStream { - type Item = Result; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { + /// Return the next `Participant` from the internal buffer, filling the buffer previously if + /// it's empty. + /// + /// Returns `None` if the `limit` is reached or there are no participants left. + pub async fn next(&mut self) -> Result, InvocationError> { // Need to split the `match` because `fill_buffer()` borrows mutably. - match self.deref_mut() { + match self { Self::Empty => {} Self::Chat { buffer, .. } => { if buffer.is_empty() { - if let Err(e) = poll_future_ready!(cx, self.fill_buffer()) { - return Poll::Ready(Some(Err(e))); - } + self.fill_buffer().await?; } } Self::Channel(iter) => { if let Some(result) = iter.next_raw() { - match result { - Ok(m) => return Poll::Ready(m.map(Ok)), - Err(e) => return Poll::Ready(Some(Err(e))), - } - } - - if let Err(e) = poll_future_ready!(cx, self.fill_buffer()) { - return Poll::Ready(Some(Err(e))); + return result; } + self.fill_buffer().await?; } } - match self.deref_mut() { - Self::Empty => Poll::Ready(None), + match self { + Self::Empty => Ok(None), Self::Chat { buffer, .. } => { let result = buffer.pop_front(); if buffer.is_empty() { *self = Self::Empty; } - Poll::Ready(result.map(Ok)) + Ok(result) + } + Self::Channel(iter) => Ok(iter.pop_item()), + } + } + + /// apply a filter on fetched participants, note that this filter will apply only on large `Channel` and not small groups + pub fn filter(mut self, filter: tl::enums::ChannelParticipantsFilter) -> Self { + match self { + ParticipantIter::Channel(ref mut c) => { + c.request.filter = filter; + self } - Self::Channel(iter) => Poll::Ready(iter.pop_item().map(Ok)), + _ => self, } } } -pub enum ProfilePhotoStream { +pub enum ProfilePhotoIter { User(IterBuffer), Chat(IterBuffer), } -impl ProfilePhotoStream { +impl ProfilePhotoIter { fn new(client: &Client, chat: PackedChat) -> Self { if let Some(user_id) = chat.try_to_input_user() { Self::User(IterBuffer::from_request( @@ -315,57 +293,45 @@ impl ProfilePhotoStream { iter.request.offset += photos.len() as i32; } - iter.buffer.extend(photos.into_iter().map(Photo::from_raw)); + iter.buffer + .extend(photos.into_iter().map(|x| Photo::from_raw(x))); Ok(total) } Self::Chat(_) => panic!("fill_buffer should not be called for Chat variant"), } } -} -impl Stream for ProfilePhotoStream { - type Item = Result; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { + /// Return the next photo from the internal buffer, filling the buffer previously if it's + /// empty. + /// + /// Returns `None` if the `limit` is reached or there are no photos left. + pub async fn next(&mut self) -> Result, InvocationError> { // Need to split the `match` because `fill_buffer()` borrows mutably. - match self.deref_mut() { + match self { Self::User(iter) => { if let Some(result) = iter.next_raw() { - match result { - Ok(m) => return Poll::Ready(m.map(Ok)), - Err(e) => return Poll::Ready(Some(Err(e))), - } - } - - if let Err(e) = poll_future_ready!(cx, self.fill_buffer()) { - return Poll::Ready(Some(Err(e))); + return result; } + self.fill_buffer().await?; } - Self::Chat(ref mut iter) => { - while let Some(maybe_message) = futures::ready!(Pin::new(&mut *iter).poll_next(cx)) - { - match maybe_message { - Ok(message) => { - if let Some(tl::enums::MessageAction::ChatEditPhoto( - tl::types::MessageActionChatEditPhoto { photo }, - )) = message.raw_action - { - return Poll::Ready(Some(Ok(Photo::from_raw(photo)))); - } - } - Err(e) => return Poll::Ready(Some(Err(e))), + Self::Chat(iter) => { + while let Some(message) = iter.next().await? { + if let Some(tl::enums::MessageAction::ChatEditPhoto( + tl::types::MessageActionChatEditPhoto { photo }, + )) = message.raw_action + { + return Ok(Some(Photo::from_raw(photo))); + } else { + continue; } } } } - match self.get_mut() { - Self::User(iter) => Poll::Ready(iter.pop_item().map(Ok)), - Self::Chat(_) => Poll::Ready(None), + match self { + Self::User(iter) => Ok(iter.pop_item()), + Self::Chat(_) => Ok(None), } } } @@ -465,7 +431,7 @@ impl Client { Ok(User::from_raw(res.pop().unwrap())) } - /// Get a stream over participants of a chat. + /// Iterate over the participants of a chat. /// /// The participants are returned in no particular order. /// @@ -474,11 +440,10 @@ impl Client { /// # Examples /// /// ``` - /// # use futures::TryStreamExt; /// # async fn f(chat: grammers_client::types::Chat, client: grammers_client::Client) -> Result<(), Box> { - /// let mut participants = client.stream_participants(&chat); + /// let mut participants = client.iter_participants(&chat); /// - /// while let Some(participant) = participants.try_next().await? { + /// while let Some(participant) = participants.next().await? { /// println!( /// "{} has role {:?}", /// participant.user.first_name().unwrap_or(&participant.user.id().to_string()), @@ -488,8 +453,8 @@ impl Client { /// # Ok(()) /// # } /// ``` - pub fn stream_participants>(&self, chat: C) -> ParticipantStream { - ParticipantStream::new(self, chat.into()) + pub fn iter_participants>(&self, chat: C) -> ParticipantIter { + ParticipantIter::new(self, chat.into()) } /// Kicks the participant from the chat. @@ -639,7 +604,7 @@ impl Client { ) } - /// Get stream over the history of profile photos for the given user or chat. + /// Iterate over the history of profile photos for the given user or chat. /// /// Note that the current photo might not be present in the history, and to avoid doing more /// work when it's generally not needed (the photo history tends to be complete but in some @@ -651,18 +616,17 @@ impl Client { /// # Examples /// /// ``` - /// # use futures::TryStreamExt; /// # async fn f(chat: grammers_client::types::Chat, client: grammers_client::Client) -> Result<(), Box> { - /// let mut photos = client.stream_profile_photos(&chat); + /// let mut photos = client.iter_profile_photos(&chat); /// - /// while let Some(photo) = photos.try_next().await? { + /// while let Some(photo) = photos.next().await? { /// println!("Did you know chat has a photo with ID {}?", photo.id()); /// } /// # Ok(()) /// # } /// ``` - pub fn stream_profile_photos>(&self, chat: C) -> ProfilePhotoStream { - ProfilePhotoStream::new(self, chat.into()) + pub fn iter_profile_photos>(&self, chat: C) -> ProfilePhotoIter { + ProfilePhotoIter::new(self, chat.into()) } /// Convert a [`PackedChat`] back into a [`Chat`]. diff --git a/lib/grammers-client/src/client/dialogs.rs b/lib/grammers-client/src/client/dialogs.rs index 87d07073..721a93eb 100644 --- a/lib/grammers-client/src/client/dialogs.rs +++ b/lib/grammers-client/src/client/dialogs.rs @@ -5,25 +5,18 @@ // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. -use std::collections::HashMap; -use std::future::Future; -use std::task::Poll; - -use futures::Stream; - +use crate::types::{ChatMap, Dialog, IterBuffer, Message}; +use crate::Client; use grammers_mtsender::InvocationError; use grammers_session::PackedChat; use grammers_tl_types as tl; - -use crate::types::{ChatMap, Dialog, IterBuffer, Message}; -use crate::utils::poll_future_ready; -use crate::Client; +use std::collections::HashMap; const MAX_LIMIT: usize = 100; -pub type DialogStream = IterBuffer; +pub type DialogIter = IterBuffer; -impl DialogStream { +impl DialogIter { fn new(client: &Client) -> Self { // TODO let users tweak all the options from the request Self::from_request( @@ -60,30 +53,20 @@ impl DialogStream { self.total = Some(total); Ok(total) } -} - -impl Stream for DialogStream { - type Item = Result; - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { + /// Return the next `Dialog` from the internal buffer, filling the buffer previously if it's + /// empty. + /// + /// Returns `None` if the `limit` is reached or there are no dialogs left. + pub async fn next(&mut self) -> Result, InvocationError> { if let Some(result) = self.next_raw() { - match result { - Ok(m) => return Poll::Ready(m.map(Ok)), - Err(e) => return Poll::Ready(Some(Err(e))), - } + return result; } use tl::enums::messages::Dialogs; - let result = { - self.request.limit = self.determine_limit(MAX_LIMIT); - poll_future_ready!(cx, self.client.invoke(&self.request)) - }?; - - let (dialogs, messages, users, chats) = match result { + self.request.limit = self.determine_limit(MAX_LIMIT); + let (dialogs, messages, users, chats) = match self.client.invoke(&self.request).await? { Dialogs::Dialogs(d) => { self.last_chunk = true; self.total = Some(d.dialogs.len()); @@ -113,41 +96,33 @@ impl Stream for DialogStream { .collect::>(); { - { - let mut state = self.client.0.state.write().unwrap(); - for dialog in dialogs.iter() { - if let tl::enums::Dialog::Dialog(tl::types::Dialog { - peer: tl::enums::Peer::Channel(channel), - pts: Some(pts), - .. - }) = dialog - { - state - .message_box - .try_set_channel_state(channel.channel_id, *pts); - } + let mut state = self.client.0.state.write().unwrap(); + self.buffer.extend(dialogs.into_iter().map(|dialog| { + if let tl::enums::Dialog::Dialog(tl::types::Dialog { + peer: tl::enums::Peer::Channel(channel), + pts: Some(pts), + .. + }) = &dialog + { + state + .message_box + .try_set_channel_state(channel.channel_id, *pts); } - } - - self.buffer.extend( - dialogs - .into_iter() - .map(|dialog| Dialog::new(dialog, &mut messages, &chats)), - ); + Dialog::new(dialog, &mut messages, &chats) + })); } // Don't bother updating offsets if this is the last time stuff has to be fetched. if !self.last_chunk && !self.buffer.is_empty() { self.request.exclude_pinned = true; - if let Some((date, id)) = self + if let Some(last_message) = self .buffer .iter() .rev() .find_map(|dialog| dialog.last_message.as_ref()) - .map(|lm| (lm.raw.date, lm.raw.id)) { - self.request.offset_date = date; - self.request.offset_id = id; + self.request.offset_date = last_message.raw.date; + self.request.offset_id = last_message.raw.id; } self.request.offset_peer = self.buffer[self.buffer.len() - 1] .chat() @@ -155,44 +130,33 @@ impl Stream for DialogStream { .to_input_peer(); } - Poll::Ready(self.pop_item().map(Ok)) - } - - fn size_hint(&self) -> (usize, Option) { - match self.total { - Some(total) => { - let rem = total - self.fetched; - (rem, Some(rem)) - } - None => (0, None), - } + Ok(self.pop_item()) } } /// Method implementations related to open conversations. impl Client { - /// Returns a new stream over the dialogs. + /// Returns a new iterator over the dialogs. /// - /// While streaming, the update state for any broadcast channel or megagroup will be set if it was unknown before. + /// While iterating, the update state for any broadcast channel or megagroup will be set if it was unknown before. /// When the update state is set for these chats, the library can actively check to make sure it's not missing any /// updates from them (as long as the queue limit for updates is larger than zero). /// /// # Examples /// /// ``` - /// # use futures::TryStreamExt; /// # async fn f(client: grammers_client::Client) -> Result<(), Box> { - /// let mut dialogs = client.stream_dialogs(); + /// let mut dialogs = client.iter_dialogs(); /// - /// while let Some(dialog) = dialogs.try_next().await? { + /// while let Some(dialog) = dialogs.next().await? { /// let chat = dialog.chat(); /// println!("{} ({})", chat.name().unwrap_or_default(), chat.id()); /// } /// # Ok(()) /// # } /// ``` - pub fn stream_dialogs(&self) -> DialogStream { - DialogStream::new(self) + pub fn iter_dialogs(&self) -> DialogIter { + DialogIter::new(self) } /// Deletes a dialog, effectively removing it from your list of open conversations. diff --git a/lib/grammers-client/src/client/files.rs b/lib/grammers-client/src/client/files.rs index 6226aa8f..7c2aa5f0 100644 --- a/lib/grammers-client/src/client/files.rs +++ b/lib/grammers-client/src/client/files.rs @@ -5,30 +5,22 @@ // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. -use std::future::Future; -use std::sync::Arc; -use std::task::Poll; -use futures::{ - stream::{FuturesUnordered, StreamExt}, - Stream, -}; +use crate::types::{photo_sizes::PhotoSize, Downloadable, Uploaded}; +use crate::utils::generate_random_id; +use crate::Client; +use futures_util::stream::{FuturesUnordered, StreamExt as _}; +use grammers_mtsender::InvocationError; +use grammers_tl_types as tl; +use std::sync::Arc; use tokio::{ io::{self, AsyncRead, AsyncReadExt}, sync::Mutex as AsyncMutex, }; -use grammers_mtsender::InvocationError; -use grammers_tl_types as tl; - -use crate::types::{photo_sizes::PhotoSize, Downloadable, Uploaded}; -use crate::utils::{generate_random_id, poll_future_ready}; -use crate::Client; - #[cfg(feature = "fs")] use { crate::types::Media, - futures::TryStreamExt, std::{io::SeekFrom, path::Path}, tokio::{ fs, @@ -43,15 +35,14 @@ const FILE_MIGRATE_ERROR: i32 = 303; const BIG_FILE_SIZE: usize = 10 * 1024 * 1024; const WORKER_COUNT: usize = 4; -pub struct DownloadStream { +pub struct DownloadIter { client: Client, done: bool, request: tl::functions::upload::GetFile, photo_size_data: Option>, - dc: Option, } -impl DownloadStream { +impl DownloadIter { fn new(client: &Client, downloadable: &Downloadable) -> Self { match downloadable { Downloadable::PhotoSize(photo_size) @@ -79,7 +70,6 @@ impl DownloadStream { limit: MAX_CHUNK_SIZE, }, photo_size_data: None, - dc: None, } } @@ -103,7 +93,6 @@ impl DownloadStream { limit: MAX_CHUNK_SIZE, }, photo_size_data: Some(data), - dc: None, } } @@ -127,33 +116,26 @@ impl DownloadStream { self.request.offset += (self.request.limit * n) as i64; self } -} - -impl Stream for DownloadStream { - type Item = Result, InvocationError>; - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { + /// Fetch and return the next chunk. + pub async fn next(&mut self) -> Result>, InvocationError> { if self.done { - return Poll::Ready(None); + return Ok(None); } - if let Some(data) = self.photo_size_data.take() { + if let Some(data) = &self.photo_size_data { self.done = true; - return Poll::Ready(Some(Ok(data))); + return Ok(Some(data.clone())); } use tl::enums::upload::File; // TODO handle maybe FILEREF_UPGRADE_NEEDED + let mut dc: Option = None; loop { - let result = match self.dc.take() { - Some(dc) => { - poll_future_ready!(cx, self.client.invoke_in_dc(&self.request, dc as i32)) - } - None => poll_future_ready!(cx, self.client.invoke(&self.request)), + let result = match dc.take() { + None => self.client.invoke(&self.request).await, + Some(dc) => self.client.invoke_in_dc(&self.request, dc as i32).await, }; break match result { @@ -161,21 +143,21 @@ impl Stream for DownloadStream { if f.bytes.len() < self.request.limit as usize { self.done = true; if f.bytes.is_empty() { - return Poll::Ready(None); + return Ok(None); } } self.request.offset += self.request.limit as i64; - Poll::Ready(Some(Ok(f.bytes))) + Ok(Some(f.bytes)) } Ok(File::CdnRedirect(_)) => { panic!("API returned File::CdnRedirect even though cdn_supported = false"); } Err(InvocationError::Rpc(err)) if err.code == FILE_MIGRATE_ERROR => { - self.dc = err.value; + dc = err.value; continue; } - Err(e) => Poll::Ready(Some(Err(e))), + Err(e) => Err(e), }; } } @@ -183,17 +165,16 @@ impl Stream for DownloadStream { /// Method implementations related to uploading or downloading files. impl Client { - /// Returns a new stream over the contents of a media document that will be downloaded. + /// Returns a new iterator over the contents of a media document that will be downloaded. /// /// # Examples /// /// ``` - /// # use futures::TryStreamExt; /// # async fn f(downloadable: grammers_client::types::Downloadable, client: grammers_client::Client) -> Result<(), Box> { /// let mut file_bytes = Vec::new(); - /// let mut download = client.stream_download(&downloadable); + /// let mut download = client.iter_download(&downloadable); /// - /// while let Some(chunk) = download.try_next().await? { + /// while let Some(chunk) = download.next().await? { /// file_bytes.extend(chunk); /// } /// @@ -201,15 +182,15 @@ impl Client { /// # Ok(()) /// # } /// ``` - pub fn stream_download(&self, downloadable: &Downloadable) -> DownloadStream { - DownloadStream::new(self, downloadable) + pub fn iter_download(&self, downloadable: &Downloadable) -> DownloadIter { + DownloadIter::new(self, downloadable) } /// Downloads a media file into the specified path. /// /// If the file already exists, it will be overwritten. /// - /// This is a small wrapper around [`Client::stream_download`] for the common case of + /// This is a small wrapper around [`Client::iter_download`] for the common case of /// wanting to save the file locally. /// /// # Examples @@ -260,15 +241,15 @@ impl Client { return Ok(()); } - let mut download = self.stream_download(downloadable); + let mut download = self.iter_download(downloadable); Client::load(path, &mut download).await } #[cfg(feature = "fs")] - async fn load>(path: P, download: &mut DownloadStream) -> Result<(), io::Error> { + async fn load>(path: P, download: &mut DownloadIter) -> Result<(), io::Error> { let mut file = fs::File::create(path).await?; while let Some(chunk) = download - .try_next() + .next() .await .map_err(|e| io::Error::new(io::ErrorKind::Other, e))? { @@ -279,7 +260,6 @@ impl Client { } /// Downloads a `Document` to specified path using multiple connections - #[cfg(feature = "fs")] async fn download_media_concurrent>( &self, media: &Media, @@ -533,7 +513,6 @@ impl Client { /// ``` /// /// [`InputMessage`]: crate::InputMessage - #[cfg(feature = "fs")] pub async fn upload_file>(&self, path: P) -> Result { let path = path.as_ref(); diff --git a/lib/grammers-client/src/client/messages.rs b/lib/grammers-client/src/client/messages.rs index 2057c34b..dad47c02 100644 --- a/lib/grammers-client/src/client/messages.rs +++ b/lib/grammers-client/src/client/messages.rs @@ -9,17 +9,14 @@ //! Methods related to sending messages. use crate::types::message::EMPTY_MESSAGE; use crate::types::{InputReactions, IterBuffer, Message}; -use crate::utils::{generate_random_id, generate_random_ids, poll_future_ready}; +use crate::utils::{generate_random_id, generate_random_ids}; use crate::{types, ChatMap, Client, InputMedia}; use chrono::{DateTime, FixedOffset}; -use futures::Stream; pub use grammers_mtsender::{AuthorizationError, InvocationError}; use grammers_session::PackedChat; use grammers_tl_types as tl; use log::{log_enabled, warn, Level}; use std::collections::HashMap; -use std::future::Future; -use std::task::Poll; use tl::enums::InputPeer; fn map_random_ids_to_messages( @@ -193,9 +190,9 @@ impl> IterBuffer; +pub type MessageIter = IterBuffer; -impl MessageStream { +impl MessageIter { fn new(client: &Client, peer: PackedChat) -> Self { Self::from_request( client, @@ -230,48 +227,33 @@ impl MessageStream { self.request.limit = 1; self.get_total().await } -} - -impl Stream for MessageStream { - type Item = Result; - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { + /// Return the next `Message` from the internal buffer, filling the buffer previously if it's + /// empty. + /// + /// Returns `None` if the `limit` is reached or there are no messages left. + pub async fn next(&mut self) -> Result, InvocationError> { if let Some(result) = self.next_raw() { - match result { - Ok(m) => return Poll::Ready(m.map(Ok)), - Err(e) => return Poll::Ready(Some(Err(e))), - } + return result; } - { - self.request.limit = self.determine_limit(MAX_LIMIT); - let limit = self.request.limit; - if let Err(e) = poll_future_ready!(cx, self.fill_buffer(limit)) { - return Poll::Ready(Some(Err(e))); - } - } + self.request.limit = self.determine_limit(MAX_LIMIT); + self.fill_buffer(self.request.limit).await?; // Don't bother updating offsets if this is the last time stuff has to be fetched. if !self.last_chunk && !self.buffer.is_empty() { - let (offset_id, offset_date) = { - let last = &self.buffer[self.buffer.len() - 1]; - (last.raw.id, last.raw.date) - }; - - self.request.offset_id = offset_id; - self.request.offset_date = offset_date; + let last = &self.buffer[self.buffer.len() - 1]; + self.request.offset_id = last.raw.id; + self.request.offset_date = last.raw.date; } - Poll::Ready(self.pop_item().map(Ok)) + Ok(self.pop_item()) } } -pub type SearchStream = IterBuffer; +pub type SearchIter = IterBuffer; -impl SearchStream { +impl SearchIter { fn new(client: &Client, peer: PackedChat) -> Self { // TODO let users tweak all the options from the request Self::from_request( @@ -369,47 +351,33 @@ impl SearchStream { self.request.limit = 0; self.get_total().await } -} -impl Stream for SearchStream { - type Item = Result; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { + /// Return the next `Message` from the internal buffer, filling the buffer previously if it's + /// empty. + /// + /// Returns `None` if the `limit` is reached or there are no messages left. + pub async fn next(&mut self) -> Result, InvocationError> { if let Some(result) = self.next_raw() { - match result { - Ok(m) => return Poll::Ready(m.map(Ok)), - Err(e) => return Poll::Ready(Some(Err(e))), - } + return result; } - { - self.request.limit = self.determine_limit(MAX_LIMIT); - let limit = self.request.limit; - if let Err(e) = poll_future_ready!(cx, self.fill_buffer(limit)) { - return Poll::Ready(Some(Err(e))); - } - } + self.request.limit = self.determine_limit(MAX_LIMIT); + self.fill_buffer(self.request.limit).await?; // Don't bother updating offsets if this is the last time stuff has to be fetched. if !self.last_chunk && !self.buffer.is_empty() { - let (last_id, last_date) = { - let last = &self.buffer[self.buffer.len() - 1]; - (last.raw.id, last.raw.date) - }; - self.request.offset_id = last_id; - self.request.max_date = last_date; + let last = &self.buffer[self.buffer.len() - 1]; + self.request.offset_id = last.raw.id; + self.request.max_date = last.raw.date; } - Poll::Ready(self.pop_item().map(Ok)) + Ok(self.pop_item()) } } -pub type GlobalSearchStream = IterBuffer; +pub type GlobalSearchIter = IterBuffer; -impl GlobalSearchStream { +impl GlobalSearchIter { fn new(client: &Client) -> Self { // TODO let users tweak all the options from the request Self::from_request( @@ -456,43 +424,28 @@ impl GlobalSearchStream { self.request.limit = 1; self.get_total().await } -} -impl Stream for GlobalSearchStream { - type Item = Result; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { + /// Return the next `Message` from the internal buffer, filling the buffer previously if it's + /// empty. + /// + /// Returns `None` if the `limit` is reached or there are no messages left. + pub async fn next(&mut self) -> Result, InvocationError> { if let Some(result) = self.next_raw() { - match result { - Ok(m) => return Poll::Ready(m.map(Ok)), - Err(e) => return Poll::Ready(Some(Err(e))), - } + return result; } - let offset_rate = { - self.request.limit = self.determine_limit(MAX_LIMIT); - let limit = self.request.limit; - match poll_future_ready!(cx, self.fill_buffer(limit)) { - Ok(offset_rate) => offset_rate, - Err(e) => return Poll::Ready(Some(Err(e))), - } - }; + self.request.limit = self.determine_limit(MAX_LIMIT); + let offset_rate = self.fill_buffer(self.request.limit).await?; // Don't bother updating offsets if this is the last time stuff has to be fetched. if !self.last_chunk && !self.buffer.is_empty() { + let last = &self.buffer[self.buffer.len() - 1]; self.request.offset_rate = offset_rate.unwrap_or(0); - let (offset_peer, offset_id) = { - let last = &self.buffer[self.buffer.len() - 1]; - (last.chat().pack().to_input_peer(), last.raw.id) - }; - self.request.offset_peer = offset_peer; - self.request.offset_id = offset_id; + self.request.offset_peer = last.chat().pack().to_input_peer(); + self.request.offset_id = last.raw.id; } - Poll::Ready(self.pop_item().map(Ok)) + Ok(self.pop_item()) } } @@ -957,50 +910,48 @@ impl Client { .filter(|m| !filter_req || m.raw.peer_id == message.raw.peer_id)) } - /// Get a stream over the message history of a chat, from most recent to oldest. + /// Iterate over the message history of a chat, from most recent to oldest. /// /// # Examples /// /// ``` - /// # use futures::TryStreamExt; /// # async fn f(chat: grammers_client::types::Chat, client: grammers_client::Client) -> Result<(), Box> { /// // Note we're setting a reasonable limit, or we'd print out ALL the messages in chat! - /// let mut messages = client.stream_messages(&chat).limit(100); + /// let mut messages = client.iter_messages(&chat).limit(100); /// - /// while let Some(message) = messages.try_next().await? { + /// while let Some(message) = messages.next().await? { /// println!("{}", message.text()); /// } /// # Ok(()) /// # } /// ``` - pub fn stream_messages>(&self, chat: C) -> MessageStream { - MessageStream::new(self, chat.into()) + pub fn iter_messages>(&self, chat: C) -> MessageIter { + MessageIter::new(self, chat.into()) } - /// Get a stream over the messages that match certain search criteria. + /// Iterate over the messages that match certain search criteria. /// /// This allows you to search by text within a chat or filter by media among other things. /// /// # Examples /// /// ``` - /// # use futures::TryStreamExt; /// # async fn f(chat: grammers_client::types::Chat, client: grammers_client::Client) -> Result<(), Box> { /// // Let's print all the people who think grammers is cool. /// let mut messages = client.search_messages(&chat).query("grammers is cool"); /// - /// while let Some(message) = messages.try_next().await? { + /// while let Some(message) = messages.next().await? { /// let sender = message.sender().unwrap(); /// println!("{}", sender.name().unwrap_or(&sender.id().to_string())); /// } /// # Ok(()) /// # } /// ``` - pub fn search_messages>(&self, chat: C) -> SearchStream { - SearchStream::new(self, chat.into()) + pub fn search_messages>(&self, chat: C) -> SearchIter { + SearchIter::new(self, chat.into()) } - /// Get a stream over the messages that match certain search criteria, without being restricted to + /// Iterate over the messages that match certain search criteria, without being restricted to /// searching in a specific chat. The downside is that this global search supports less filters. /// /// This allows you to search by text within a chat or filter by media among other things. @@ -1008,19 +959,18 @@ impl Client { /// # Examples /// /// ``` - /// # use futures::TryStreamExt; /// # async fn f(client: grammers_client::Client) -> Result<(), Box> { /// // Let's print all the chats were people think grammers is cool. /// let mut messages = client.search_all_messages().query("grammers is cool"); /// - /// while let Some(message) = messages.try_next().await? { + /// while let Some(message) = messages.next().await? { /// println!("{}", message.chat().name().unwrap_or(&message.chat().id().to_string())); /// } /// # Ok(()) /// # } /// ``` - pub fn search_all_messages(&self) -> GlobalSearchStream { - GlobalSearchStream::new(self) + pub fn search_all_messages(&self) -> GlobalSearchIter { + GlobalSearchIter::new(self) } /// Get up to 100 messages using their ID. diff --git a/lib/grammers-client/src/client/updates.rs b/lib/grammers-client/src/client/updates.rs index d64aa1e8..15860ee9 100644 --- a/lib/grammers-client/src/client/updates.rs +++ b/lib/grammers-client/src/client/updates.rs @@ -10,7 +10,7 @@ use super::Client; use crate::types::{ChatMap, Update}; -use futures::future::{select, Either}; +use futures_util::future::{select, Either}; use grammers_mtsender::utils::sleep_until; pub use grammers_mtsender::{AuthorizationError, InvocationError}; use grammers_session::channel_id; diff --git a/lib/grammers-client/src/lib.rs b/lib/grammers-client/src/lib.rs index fe22ae85..a4a8b18a 100644 --- a/lib/grammers-client/src/lib.rs +++ b/lib/grammers-client/src/lib.rs @@ -56,6 +56,3 @@ pub use grammers_mtproto::transport; pub use grammers_mtsender::{FixedReconnect, InvocationError, NoReconnect, ReconnectionPolicy}; pub use grammers_session as session; pub use grammers_tl_types; - -// re-export futures -pub use futures; diff --git a/lib/grammers-client/src/types/action.rs b/lib/grammers-client/src/types/action.rs index 690e709d..25c0e21f 100644 --- a/lib/grammers-client/src/types/action.rs +++ b/lib/grammers-client/src/types/action.rs @@ -5,7 +5,7 @@ // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. -use futures::future::Either; +use futures_util::future::Either; use grammers_mtsender::utils; use grammers_mtsender::InvocationError; use grammers_session::PackedChat; @@ -124,7 +124,7 @@ impl ActionSender { tokio::pin!(action); - match futures::future::select(action, &mut future).await { + match futures_util::future::select(action, &mut future).await { Either::Left((_, _)) => continue, Either::Right((output, _)) => break output, } diff --git a/lib/grammers-client/src/types/chats.rs b/lib/grammers-client/src/types/chats.rs index a4f0691a..d18e9cce 100644 --- a/lib/grammers-client/src/types/chats.rs +++ b/lib/grammers-client/src/types/chats.rs @@ -5,7 +5,12 @@ // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. - +use crate::types::Role; +use crate::Client; +use grammers_mtsender::{InvocationError, RpcError}; +use grammers_session::PackedChat; +use grammers_tl_types as tl; +use pin_project_lite::pin_project; use std::{ future::Future, marker::PhantomPinned, @@ -14,18 +19,8 @@ use std::{ task::{Context, Poll}, time::Duration, }; - -use futures::TryStreamExt; -use pin_project_lite::pin_project; use web_time::{SystemTime, UNIX_EPOCH}; -use grammers_mtsender::{InvocationError, RpcError}; -use grammers_session::PackedChat; -use grammers_tl_types as tl; - -use crate::types::Role; -use crate::Client; - type BuilderRes = Result<(), InvocationError>; type AdminFutGen = fn(AdminRightsBuilderInner) -> F; @@ -190,8 +185,8 @@ impl> AdminRightsBuilder { } }; - let mut participants = s.client.stream_participants(s.chat); - while let Some(participant) = participants.try_next().await? { + let mut participants = s.client.iter_participants(s.chat); + while let Some(participant) = participants.next().await? { if matches!(participant.role, Role::Creator(_) | Role::Admin(_)) && participant.user.id() == uid { diff --git a/lib/grammers-client/src/utils.rs b/lib/grammers-client/src/utils.rs index 60557873..c2e1a877 100644 --- a/lib/grammers-client/src/utils.rs +++ b/lib/grammers-client/src/utils.rs @@ -103,23 +103,3 @@ pub(crate) fn always_find_entity( None => types::Chat::unpack(get_packed()), } } - -/// A helper macro to poll the future using the given context. -macro_rules! poll_future { - ($cx:expr, $($future:tt)*) => {{ - let this = $($future)*; - futures::pin_mut!(this); - this.poll($cx) - }}; -} - -/// A helper macro to poll the future using the given context. -/// return `Poll::Pending` early in case that future returned `Poll::Pending`. -macro_rules! poll_future_ready { - ($cx:expr, $($future:tt)*) => { - futures::ready!(crate::utils::poll_future!($cx, $($future)*)) - }; -} - -pub(crate) use poll_future; -pub(crate) use poll_future_ready;