Skip to content

Commit

Permalink
Try to use destruct/enum support
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Jun 1, 2019
1 parent 6db89d3 commit 7d19cc1
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 49 deletions.
31 changes: 10 additions & 21 deletions futures-util/src/future/flatten_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
use pin_utils::unsafe_pinned;
use pin_project::{pin_project, unsafe_project};

/// Stream for the [`flatten_stream`](super::FutureExt::flatten_stream) method.
#[unsafe_project]
#[must_use = "streams do nothing unless polled"]
pub struct FlattenStream<Fut: Future> {
#[pin]
state: State<Fut, Fut::Output>,
}

impl<Fut: Future> FlattenStream<Fut> {
unsafe_pinned!(state: State<Fut, Fut::Output>);

pub(super) fn new(future: Fut) -> FlattenStream<Fut> {
FlattenStream {
state: State::Future(future)
Expand All @@ -32,26 +32,13 @@ impl<Fut> fmt::Debug for FlattenStream<Fut>
}
}

#[unsafe_project(Unpin)]
#[derive(Debug)]
enum State<Fut, St> {
// future is not yet called or called and not ready
Future(Fut),
Future(#[pin] Fut),
// future resolved to Stream
Stream(St),
}

impl<Fut, St> State<Fut, St> {
fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> State<Pin<&'a mut Fut>, Pin<&'a mut St>> {
// safety: data is never moved via the resulting &mut reference
match unsafe { Pin::get_unchecked_mut(self) } {
// safety: the future we're re-pinning here will never be moved;
// it will just be polled, then dropped in place
State::Future(f) => State::Future(unsafe { Pin::new_unchecked(f) }),
// safety: the stream we're repinning here will never be moved;
// it will just be polled, then dropped in place
State::Stream(s) => State::Stream(unsafe { Pin::new_unchecked(s) }),
}
}
Stream(#[pin] St),
}

impl<Fut> FusedStream for FlattenStream<Fut>
Expand All @@ -72,15 +59,17 @@ impl<Fut> Stream for FlattenStream<Fut>
{
type Item = <Fut::Output as Stream>::Item;

#[pin_project(self)]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match self.as_mut().state().get_pin_mut() {
#[project]
match self.state.as_mut().project() {
State::Future(f) => {
let stream = ready!(f.poll(cx));
// Future resolved to stream.
// We do not return, but poll that
// stream in the next loop iteration.
self.as_mut().state().set(State::Stream(stream));
self.state.set(State::Stream(stream));
}
State::Stream(s) => return s.poll_next(cx),
}
Expand Down
12 changes: 7 additions & 5 deletions futures-util/src/io/lines.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
use super::read_line::read_line_internal;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
use futures_io::AsyncBufRead;
use pin_project::{pin_project, unsafe_project};
use std::io;
use std::mem;
use std::pin::Pin;
use super::read_line::read_line_internal;

/// Stream for the [`lines`](super::AsyncBufReadExt::lines) method.
#[unsafe_project(Unpin)]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Lines<R> {
#[pin]
reader: R,
buf: String,
bytes: Vec<u8>,
read: usize,
}

impl<R: Unpin> Unpin for Lines<R> {}

impl<R: AsyncBufRead> Lines<R> {
pub(super) fn new(reader: R) -> Self {
Self {
Expand All @@ -32,9 +33,10 @@ impl<R: AsyncBufRead> Lines<R> {
impl<R: AsyncBufRead> Stream for Lines<R> {
type Item = io::Result<String>;

#[pin_project(self)]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Self { reader, buf, bytes, read } = unsafe { Pin::get_unchecked_mut(self) };
let reader = unsafe { Pin::new_unchecked(reader) };
#[project]
let Lines { reader, buf, bytes, read } = self;
let n = ready!(read_line_internal(reader, buf, bytes, read, cx))?;
if n == 0 && buf.is_empty() {
return Poll::Ready(None)
Expand Down
34 changes: 11 additions & 23 deletions futures-util/src/try_future/try_flatten_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ use core::pin::Pin;
use futures_core::future::TryFuture;
use futures_core::stream::{FusedStream, Stream, TryStream};
use futures_core::task::{Context, Poll};
use pin_utils::unsafe_pinned;
use pin_project::{pin_project, unsafe_project};

/// Stream for the [`try_flatten_stream`](super::TryFutureExt::try_flatten_stream) method.
#[unsafe_project]
#[must_use = "streams do nothing unless polled"]
pub struct TryFlattenStream<Fut>
where
Fut: TryFuture,
{
#[pin]
state: State<Fut, Fut::Ok>,
}

Expand All @@ -19,8 +21,6 @@ where
Fut: TryFuture,
Fut::Ok: TryStream<Error = Fut::Error>,
{
unsafe_pinned!(state: State<Fut, Fut::Ok>);

pub(super) fn new(future: Fut) -> Self {
Self {
state: State::Future(future)
Expand All @@ -40,31 +40,17 @@ where
}
}

#[unsafe_project(Unpin)]
#[derive(Debug)]
enum State<Fut, St> {
// future is not yet called or called and not ready
Future(Fut),
Future(#[pin] Fut),
// future resolved to Stream
Stream(St),
Stream(#[pin] St),
// future resolved to error
Done,
}

impl<Fut, St> State<Fut, St> {
fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> State<Pin<&'a mut Fut>, Pin<&'a mut St>> {
// safety: data is never moved via the resulting &mut reference
match unsafe { Pin::get_unchecked_mut(self) } {
// safety: the future we're re-pinning here will never be moved;
// it will just be polled, then dropped in place
State::Future(f) => State::Future(unsafe { Pin::new_unchecked(f) }),
// safety: the stream we're repinning here will never be moved;
// it will just be polled, then dropped in place
State::Stream(s) => State::Stream(unsafe { Pin::new_unchecked(s) }),
State::Done => State::Done,
}
}
}

impl<Fut> FusedStream for TryFlattenStream<Fut>
where
Fut: TryFuture,
Expand All @@ -86,21 +72,23 @@ where
{
type Item = Result<<Fut::Ok as TryStream>::Ok, Fut::Error>;

#[pin_project(self)]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match self.as_mut().state().get_pin_mut() {
#[project]
match self.state.as_mut().project() {
State::Future(f) => {
match ready!(f.try_poll(cx)) {
Ok(stream) => {
// Future resolved to stream.
// We do not return, but poll that
// stream in the next loop iteration.
self.as_mut().state().set(State::Stream(stream));
self.state.set(State::Stream(stream));
}
Err(e) => {
// Future resolved to error.
// We have neither a pollable stream nor a future.
self.as_mut().state().set(State::Done);
self.state.set(State::Done);
return Poll::Ready(Some(Err(e)));
}
}
Expand Down

0 comments on commit 7d19cc1

Please sign in to comment.