From 8152e6259d1ae7ceecfb691e7c3c3b3f4b577482 Mon Sep 17 00:00:00 2001 From: Nell Shamrell Date: Fri, 25 Sep 2020 19:09:17 -0700 Subject: [PATCH 01/47] adds async stream rfc Signed-off-by: Nell Shamrell --- text/0000-async-stream.md | 758 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 758 insertions(+) create mode 100644 text/0000-async-stream.md diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md new file mode 100644 index 00000000000..5f8b8fde326 --- /dev/null +++ b/text/0000-async-stream.md @@ -0,0 +1,758 @@ +- Feature Name: `async_stream` +- Start Date: 2020-09-29 +- RFC PR: [rust-lang/rfcs#0000](https://github.com/rust-lang/rfcs/pull/0000) +- Rust Issue: [rust-lang/rust#0000](https://github.com/rust-lang/rust/issues/0000) + +# Summary +[summary]: #summary + +Introduce the `Stream` trait into the standard library, using the +design from `futures`. Redirect the `Stream` trait definition in the +`futures-core` crate (which is "pub-used" by the `futures` crate) to the standard library. + +# Motivation +[motivation]: #motivation + +Streams are a core async abstraction. We want to enable portable libraries that +produce/consume streams without being tied to a particular executor. + +People can do this currently using the `Stream` trait defined in the +[futures](https://crates.io/crates/futures) crate. However, the +stability guarantee of that trait would be clearer if it were added +to the standard library. For example, if [Tokio](https://tokio.rs/) +wishes to declare a [5 year stability period](http://smallcultfollowing.com/babysteps/blog/2020/02/11/async-interview-6-eliza-weisman/#communicating-stability), +having the stream trait in the standard library means there are no concerns +about the trait changing during that time ([citation](http://smallcultfollowing.com/babysteps/blog/2019/12/23/async-interview-3-carl-lerche/#what-should-we-do-next-stabilize-stream)). + +## Examples of crates that are consuming streams + +### async-h1 + +* [async-h1](https://docs.rs/async-h1)'s server implementation takes `TcpStream` instances produced by a `TcpListener` in a loop. + +### async-sse + +* [async-sse](https://docs.rs/async-sse/) parses incoming buffers into a stream of messages. + +## Why a shared trait? + +We eventually want dedicated syntax for working with streams, which will require a shared trait. +This includes a trait for producing streams and a trait for consuming streams. + +# Guide-level explanation +[guide-level-explanation]: #guide-level-explanation + +A "stream" is the async version of an iterator. The `Stream` trait +matches the definition of an [iterator], except that the `next` method +is defined to "poll" for the next item. In other words, where the +`next` method on an iterator simply computes (and returns) the next +item in the sequence, the `poll_next` method on stream asks if the +next item is ready. If so, it will be returned, but otherwise +`poll_next` will return [`Poll::pending`]. Just as with a [`Future`], +returning [`Poll::pending`] implies that the stream has arranged for +the current task to be re-awoken when the data is ready. + +[iterator]: https://doc.rust-lang.org/std/iter/trait.Iterator.html +[`Future`]: https://doc.rust-lang.org/std/future/trait.Future.html +[`Poll::pending`]: https://doc.rust-lang.org/std/task/enum.Poll.html#variant.Pending + +```rust +// Defined in std::stream module +pub trait Stream { + // Core items: + type Item; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; + + // Optional optimization hint, just like with iterators: + #[inline] + fn size_hint(&self) -> (usize, Option) { + (0, None) + } + + // Convenience methods (covered later on in the RFC): + fn next(&mut self) -> Next<'_, Self> + where + Self: Unpin; +} +``` + +The arguments to `poll_next` match that of the [`Future::poll`] method: + +* The self must be a pinned reference, ensuring both unique access to + the stream and that the stream value itself will not move. Pinning + allows the stream to save pointers into itself when it suspends, + which will be required to support generator syntax at some point. +* The [context] `cx` defines details of the current task. In particular, + it gives access to the [`Waker`] for the task, which will allow the + task to be re-awoken once data is ready. + +[`Future::poll`]: https://doc.rust-lang.org/std/future/trait.Future.html#tymethod.poll +[pinned]: https://doc.rust-lang.org/std/pin/struct.Pin.html +[context]: https://doc.rust-lang.org/std/task/struct.Context.html +[Waker]: https://doc.rust-lang.org/std/task/struct.Waker.html + +### Why does next require Self:Unpin? + +When drafting this RFC, there was a [good deal of discussion](https://github.com/rust-lang/wg-async-foundations/pull/15#discussion_r452482084) around why the `next` method requires `Self:Unpin`. + +To understand this, it helps to take a closer look at the definition of `Next` (this struct is further discussed later in this RFC) in the [futures-util crate](https://docs.rs/futures-util/0.3.5/src/futures_util/stream/stream/next.rs.html#10-12). + +```rust +pub struct Next<'a, St: ?Sized> { + stream: &'a mut St, +} +``` +Since `Stream::poll_next` takes a pinned reference, the next future needs `S` to be `Unpin` in order to safely construct a `Pin<&mut S>` from a `&mut S`. + +An alternative approach we could take would be to have the `next` method take `Pin<&mut S>`, rather than `&mut S`. However, this would require pinning even when the type is `Unpin`. The current approach requires pinning only when the type is not `Unpin`. + +At the moment, we do not see many `Unpin!` streams in practice (though there is one in the [futures-intrusive crate](https://github.com/Matthias247/futures-intrusive/blob/master/src/channel/mpmc.rs#L565-L625)). Where they will become important is when we introduce async generators, as discussed in [future-possibilities]. + +In summary, an async stream: +* has a pinned receiver +* that takes `cx` context +* and returns a `Poll` + +## Initial impls + +There are a number of simple "bridge" impls that are also provided: + +```rust +impl Stream for Box +where + S: Stream + Unpin + ?Sized, +{ + type Item = ::Item +} + +impl Stream for &mut S +where + S: Stream + Unpin + ?Sized, +{ + type Item = ::Item; +} + +impl Stream for Pin

+where + P: DerefMut + Unpin, + T::Target: Stream, +{ + type Item = ::Item; +} + +impl Stream for AssertUnwindSafe +where + S: Stream, +{ + type Item = ::Item; +} +``` + +## Next method/struct + +We should also implement a next method, similar to [the implementation in the futures-util crate](https://docs.rs/futures-util/0.3.5/src/futures_util/stream/stream/next.rs.html#10-12). + +In general, we have purposefully kept the core trait definition minimal. +There are a number of useful extension methods that are available, for example, +in the futures-stream crate, but we have not included them because they involve +closure arguments, and we have not yet finalized the design of async closures. + +However, the core methods alone are extremely unergonomic. You can't even iterate +over the items coming out of the stream. Therefore, we include a few minimal +convenience methods that are not dependent on any unstable features. Most notably, next + +```rust +/// A future that advances the stream and returns the next value. +/// +/// This `struct` is created by the [`next`] method on [`Stream`]. See its +/// documentation for more. +/// +/// [`next`]: trait.Stream.html#method.next +/// [`Stream`]: trait.Stream.html +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Next<'a, S: ?Sized> { + stream: &'a mut S, +} + +impl Unpin for Next<'_, St> {} + +impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> { + pub(super) fn new(stream: &'a mut St) -> Self { + Next { stream } + } +} + +impl Future for Next<'_, St> { + type Output = Option; + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll { + Pin::new(&mut *self.stream).poll_next(cx) + } +} +``` + +This would allow a user to await on a future: + +```rust +while let Some(v) = stream.next().await { + +} +``` + +We could also consdier adding a try_next? function, allowing +a user to write: + +```rust +while let Some(x) = s.try_next().await? +``` + +One thing to note, if a user is using an older version of `futures-util`, +they would experience ambiguity when trying to use the `next` method that +is added to the standard library (and redirected to from `futures-core`). + +This can be done as a non-breaking change, but would require everyone to +upgrade rustc. We will want to create a transition plan on what this +means for users and pick the timing carefully. + +# Reference-level explanation +[reference-level-explanation]: #reference-level-explanation + +This section goes into details about various aspects of the design and +why they ended up the way they did. + +## Where does `Stream` live in the std lib? + +`Stream` will live in the `core::stream` module and be re-exported as `std::stream`. + +It is possible that it could live in another area as well, though this followes +the pattern of `core::future`. + +## Why use a `poll` method? + +An alternative design for the stream trait would be to have a trait +that defines an async `next` method: + +```rust +trait Stream { + type Item; + + async fn next(&mut self) -> Option; +} +``` + +Unfortunately, async methods in traits are not currently supported, +and there [are a number of challenges to be +resolved](https://rust-lang.github.io/wg-async-foundations/design_notes/async_fn_in_traits.html) +before they can be added. + +Moreover, it is not clear yet how to make traits that contain async +functions be `dyn` safe, and it is imporant to be able to pass around `dyn +Stream` values without the need to monomorphize the functions that work +with them. + +Unfortunately, the use of poll does mean that it is harder to write +stream implementations. The long-term fix for this, discussed in the [Future possiblilities](future-possibilities) section, is dedicated [generator syntax]. + +# Drawbacks +[drawbacks]: #drawbacks + +Why should we *not* do this? + +# Rationale and alternatives +[rationale-and-alternatives]: #rationale-and-alternatives + +## Where should stream live? + +As mentioned above, `core::stream` is analogous to `core::future`. But, do we want to find +some other naming scheme that can scale up to other future additions, such as io traits or channels? + +# Prior art +[prior-art]: #prior-art + +Discuss prior art, both the good and the bad, in relation to this proposal. + +This section is intended to encourage you as an author to think about the lessons from other languages, provide readers of your RFC with a fuller picture. +If there is no prior art, that is fine - your ideas are interesting to us whether they are brand new or if it is an adaptation from other languages. + +Note that while precedent set by other languages is some motivation, it does not on its own motivate an RFC. +Please also take into consideration that rust sometimes intentionally diverges from common language features. + +# Unresolved questions +[unresolved-questions]: #unresolved-questions + +- What parts of the design do you expect to resolve through the RFC process before this gets merged? +- What parts of the design do you expect to resolve through the implementation of this feature before stabilization? +- What related issues do you consider out of scope for this RFC that could be addressed in the future independently of the solution that comes out of this RFC? + +# Future possibilities +[future-possibilities]: #future-possibilities + +## Convenience methods + +The `Iterator` trait defines a number of useful combinators, like +`map`. The `Stream` trait being proposed here does not include any +such conveniences. Instead, they are available via extension traits, +such as the [`StreamExt`] trait offered by the [`futures`] crate. + +[`StreamExt`]: https://docs.rs/futures/0.3.5/futures/stream/trait.StreamExt.html +[`futures`]: https://crates.io/crates/futures + +The reason that we have chosen to exclude combinators is that a number +of them would require access to async closures. As of this writing, +async closures are unstable and there are a number of [outstanding +design issues] to be resolved before they are added. Therefore, we've +decided to enable progress on the stream trait by stabilizing a core, +and to come back to the problem of extending it with combinators. + +[outstanding design issues]: https://rust-lang.github.io/wg-async-foundations/design_notes/async_closures.html + +This path does carry some risk. Adding combinator methods can cause +existing code to stop compiling due to the ambiguities in method +resolution. We have had problems in the past with attempting to migrate +iterator helper methods from `itertools` for this same reason. + +While such breakage is technically permitted by our semver guidelines, +it would obviously be best to avoid it, or at least to go to great +lengths to mitigate its effects. One option would be to extend the +language to allow method resolution to "favor" the extension trait in +existing code, perhaps as part of an edition migration. + +Designing such a migration feature is out of scope for this RFC. + +## IntoStream / FromStream traits + +### IntoStream + +**Iterators** + +Iterators have an `IntoIterator` that is used with `for` loops to convert items of other types to an iterator. + +```rust +pub trait IntoIterator where + ::Item == Self::Item, +{ + type Item; + + type IntoIter: Iterator; + + fn into_iter(self) -> Self::IntoIter; +} +``` + +Examples taken from the Rust docs on [for loops and into_iter](https://doc.rust-lang.org/std/iter/index.html#for-loops-and-intoiterator) + +* `for x in iter` uses `impl IntoIterator for T` + +```rust +let values = vec![1, 2, 3, 4, 5]; + +for x in values { + println!("{}", x); +} +``` + +Desugars to: + +```rust +let values = vec![1, 2, 3, 4, 5]; +{ + let result = match IntoIterator::into_iter(values) { + mut iter => loop { + let next; + match iter.next() { + Some(val) => next = val, + None => break, + }; + let x = next; + let () = { println!("{}", x); }; + }, + }; + result +} +``` +* `for x in &iter` uses `impl IntoIterator for &T` +* `for x in &mut iter` uses `impl IntoIterator for &mut T` + +**Streams** + +We may want a trait similar to this for `Stream`. The `IntoStream` trait would provide a way to convert something into a `Stream`. + +This trait could look like this: + +```rust +pub trait IntoStream +where + ::Item == Self::Item, +{ + type Item; + + type IntoStream: Stream; + + fn into_stream(self) -> Self::IntoStream; +} +``` + +This trait (as expressed by @taiki-e in [a comment on a draft of this RFC](https://github.com/rust-lang/wg-async-foundations/pull/15/files#r449880986)) makes it easy to write streams in combination with [async stream](https://github.com/taiki-e/futures-async-stream). For example: + +```rust +type S(usize); + +impl IntoStream for S { + type Item = usize; + type IntoStream: impl Stream; + + fn into_stream(self) -> Self::IntoStream { + #[stream] + async move { + for i in 0..self.0 { + yield i; + } + } + } +} +``` + +### FromStream + +**Iterators** + +Iterators have an `FromIterator` that is used to convert iterators into another type. + +```rust +pub trait FromIterator { + + fn from_iter(iter: T) -> Self + where + T: IntoIterator; +} +``` + +It should be noted that this trait is rarely used directly, instead used through Iterator's collect method ([source](https://doc.rust-lang.org/std/iter/trait.FromIterator.html)). + +```rust +pub trait Interator { + fn collect(self) -> B + where + B: FromIterator, + { ... } +} +``` + +Examples taken from the Rust docs on [iter and collect](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.collect) + + +```rust +let a = [1, 2, 3]; + +let doubled: Vec = a.iter() + .map(|&x| x * 2) + .collect(); + +``` + +**Streams** + +We may want a trait similar to this for `Stream`. The `FromStream` trait would provide way to convert a `Stream` into another type. + +This trait could look like this: + +```rust +pub trait FromStream { + async fn from_stream(stream: T) -> Self + where + T: IntoStream; +} +``` + +We could potentially include a collect method for Stream as well. + +```rust +pub trait Stream { + async fn collect(self) -> B + where + B: FromStream, + { ... } +} +``` + +When drafting this RFC, there was [discussion](https://github.com/rust-lang/wg-async-foundations/pull/15#discussion_r451182595) +about whether to implement from_stream for all T where `T: FromIterator` as well. +`FromStream` is perhaps more general than `FromIterator` because the await point is allowed to suspend execution of the +current function, but doesn't have too. Therefore, many (if not all) existing impls of `FromIterator` would work +for `FromStream` as well. While this would be a good point for a future discussion, it is not in the scope of this RFC. + +## Other Traits + +Eventually, we may also want to add some (if not all) of the roster of traits we found useful for `Iterator`. + +[async_std::stream](https://docs.rs/async-std/1.6.0/async_std/stream/index.html) has created several async counterparts to the traits in [std::iter](https://doc.rust-lang.org/std/iter/). These include: + +* DoubleEndedStream: A stream able to yield elements from both ends. +* ExactSizeStream: A stream that knows its exact length. +* Extend: Extends a collection with the contents of a stream. +* FromStream: Conversion from a Stream. +* FusedStream: A stream that always continues to yield None when exhausted. +* IntoStream: Conversion into a Stream. +* Product: Trait to represent types that can be created by multiplying the elements of a stream. +* Stream: An asynchronous stream of values. +* Sum: Trait to represent types that can be created by summing up a stream. + +As detailed in previous sections, the migrations to add these traits are out of scope for this RFC. + +## Async iteration syntax + +Currently, if someone wishes to iterate over a `Stream` as defined in the `futures` crate, +they are not able to use `for` loops, they must use `while let` and `next/try_next` instead. + +We may wish to extend the `for` loop so that it works over streams as well. + +```rust +#[async] +for elem in stream { ... } +``` + +One of the complications of using `while let` syntax is the need to pin. +A `for` loop syntax that takes ownership of the stream would be able to +do the pinning for you. + +We may not want to make sequential processing "too easy" without also enabling +parallel/concurrent processing, which people frequently want. One challenge is +that parallel processing wouldn't naively permit early returns and other complex +control flow. We could add a `par_stream()` method, similar to +[Rayon's](https://github.com/rayon-rs/rayon) `par_iter()`. + +Designing this extension is out of scope for this RFC. However, it could be prototyped using procedural macros today. + +## "Lending" streams + +There has been much discussion around lending streams (also referred to as attached streams). + +### Definitions + +[Source](https://smallcultfollowing.com/babysteps/blog/2019/12/10/async-interview-2-cramertj-part-2/#the-need-for-streaming-streams-and-iterators) + + +In a **lending** stream (also known as an "attached" stream), the `Item` that gets +returned by `Stream` may be borrowed from `self`. It can only be used as long as +the `self` reference remains live. + +In a **non-lending** stream (also known as a "detached" stream), the `Item` that +gets returned by `Stream` is "detached" from self. This means it can be stored +and moved about independently from `self`. + +This RFC does not cover the addition of lending streams (streams as implemented through +this RFC are all non-lending streams). + +We can add the `Stream` trait to the standard library now and delay +adding in this distinction between the two types of streams - lending and +non-lending. The advantage of this is it would allow us to copy the `Stream` +trait from `futures` largely 'as is'. + +The disadvantage of this is functions that consume streams would +first be written to work with `Stream`, and then potentially have +to be rewritten later to work with `LendingStream`s. + +### Current Stream Trait + +```rust +pub trait Stream { + type Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; + + #[inline] + fn size_hint(&self) -> (usize, Option) { + (0, None) + } +} +``` + +This trait, like `Iterator`, always gives ownership of each item back to its caller. This offers flexibility - +such as the ability to spawn off futures processing each item in parallel. + +### Potential Lending Stream Trait + +```rust +impl LendingStream for S +where + S: Stream, +{ + type Item<'_> = S::Item; + + fn poll_next<'s>( + self: Pin<&'s mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + Stream::poll_next(self, cx) + } +} +``` + +This is a "conversion" trait such that anything which implements `Stream` can also implement +`Lending Stream`. + +This trait captures the case we re-use internal buffers. This would be less flexible for +consumers, but potentially more efficient. Types could implement the `LendingStream` +where they need to re-use an internal buffer and `Stream` if they do not. There is room for both. + +We would also need to pursue the same design for iterators - whether through adding two traits +or one new trait with a "conversion" from the old trait. + +This also brings up the question of whether we should allow conversion in the opposite way - if +every non-lending stream can become a lending one, should _some_ lending streams be able to +become non-lending ones? + +**Coherence** + +The impl above has a problem. As the Rust language stands today, we cannot cleanly convert +impl Stream to impl LendingStream due to a coherence conflict. + +If you have other impls like: + +```rust +impl Stream for Box where T: Stream +``` + +and + +```rust +impl LendingStream for Box where T: LendingStream +``` + +There is a coherence conflict for `Box`, so presumably it will fail the coherence rules. + +[More examples are available here](https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=a667a7560f8dc97ab82a780e27dfc9eb). + +Resolving this would require either an explicit “wrapper” step or else some form of language extension. + +It should be noted that the same applies to Iterator, it is not unique to Stream. + +We may eventually want a super trait relationship available in the Rust language + +```rust +trait Stream: LendingStream +``` + +This would allow us to leverage `default impl`. + +These use cases for lending/non-lending streams need more thought, which is part of the reason it +is out of the scope of this particular RFC. + +## Generator syntax +[generator syntax]: #generator-syntax + +In the future, we may wish to introduce a new form of function - +`gen fn` in iterators and `async gen` in async code that +can contain `yield` statements. Calling such a function would +yield a `impl Iterator` or `impl Stream`, for sync and async +respectively. Given an "attached" or "borrowed" stream, the generator +yield could return references to local variables. Given a "detached" +or "owned" stream, the generator yield could return things +that you own or things that were borrowed from your caller. + +### In Iterators + +```rust +gen fn foo() -> Value { + yield value; +} +``` + +After desugaring, this would result in a function like: + +```rust +fn foo() -> impl Iterator +``` + +### In Async Code + +```rust +async gen fn foo() -> Value +``` + +After desugaring would result in a function like: + +```rust +fn foo() -> impl Stream +``` + +If we introduce `-> Stream` first, we will have to permit `LendingStream` in the future. +Additionally, if we introduce `LendingStream` later, we'll have to figure out how +to convert a `LendingStream` into a `Stream` seamlessly. + +### Differences between Iterator generators and Async generators + +We want `Stream` and `Iterator` to work as analogously as possible, including when used with generators. However, in the current design, there is a crucial difference between the two. + +Consider Iterator's core `next` method: + +```rust +pub trait Iterator { + type Item; + + fn next(&mut self) -> Option; +} +``` +And then compare it to the proposed Stream `next` method: + +```rust +pub trait Stream { + type Item; + + fn next(&mut self) -> Next<'_, Self> + where + Self: Unpin; +} +``` + +Iterator does not require pinning its core next method. In order for a `gen fn` to operate with the Iterator ecosystem, there must be some kind of initial pinning step that converts its result into an iterator. This will be tricky, since you can't return a pinned value except by boxing. + +The general shape will be: + +```rust +gen_fn().pin_somehow().adapter1().adapter2() +``` + +With streams, the core interface _is_ pinned, so pinning occurs at the last moment. + +The general shape would be + +```rust +async_gen_fn().adapter1().adapter2().pin_somehow() +``` + +Pinning at the end, like with a stream, lets you build and return those adapters and then apply pinning at the end. This may be the more efficient setup and implies that, in order to have a `gen fn` that produces iterators, we will need to potentially disallow borrowing yields or implement some kind of `PinnedIterator` trait that can be "adapted" into an iterator by pinning. + +For example: + +```rust +trait PinIterator { + type Item; + fn next(self: Pin<&mut Self>) -> Self::Item; + // combinators can go here (duplicating Iterator for the most part) +} +impl + DerefMut> Iterator for Pin

{ + type Item = ::Item; + fn next(&mut self) -> Self::Item { self.as_mut().next() } +} + +// this would be nice.. but would lead to name resolution ambiguity for our combinators 😬 +default impl PinIterator for T { .. } +``` +Pinning also applies to the design of AsyncRead/AsyncWrite, which currently uses Pin even through there is no clear plan to make them implemented with generator type syntax. The asyncification of a signature is current understood as pinned receiver + context arg + return poll. + +### Yielding + +It would be useful to be able to yield from inside a for loop, as long as the for loop is +over a borrowed input and not something owned by the stack frame. + +In the spirit of experimentation, boats has written the [propane](https://github.com/withoutboats/propane) +crate. This crate includes a `#[propane] fn` that changes the function signature +to return `impl Iterator` and lets you `yield`. The non-async version uses +`static generator` that is currently in nightly only. + +Further designing generator functions is out of the scope of this RFC. \ No newline at end of file From 16c48707f555974495623031c9845c0301c7d27d Mon Sep 17 00:00:00 2001 From: Nell Shamrell-Harrington Date: Tue, 29 Sep 2020 10:31:55 -0700 Subject: [PATCH 02/47] Update text/0000-async-stream.md Co-authored-by: Florian Gilcher --- text/0000-async-stream.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 5f8b8fde326..51e4a8df0d9 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -159,7 +159,7 @@ closure arguments, and we have not yet finalized the design of async closures. However, the core methods alone are extremely unergonomic. You can't even iterate over the items coming out of the stream. Therefore, we include a few minimal -convenience methods that are not dependent on any unstable features. Most notably, next +convenience methods that are not dependent on any unstable features, such as `next`. ```rust /// A future that advances the stream and returns the next value. @@ -755,4 +755,4 @@ crate. This crate includes a `#[propane] fn` that changes the function signature to return `impl Iterator` and lets you `yield`. The non-async version uses `static generator` that is currently in nightly only. -Further designing generator functions is out of the scope of this RFC. \ No newline at end of file +Further designing generator functions is out of the scope of this RFC. From 20a064b53b9391c2b1923143da14ce8124200d53 Mon Sep 17 00:00:00 2001 From: Nell Shamrell-Harrington Date: Tue, 29 Sep 2020 10:32:05 -0700 Subject: [PATCH 03/47] Update text/0000-async-stream.md Co-authored-by: Florian Gilcher --- text/0000-async-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 51e4a8df0d9..a1fc65b5f3a 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -203,7 +203,7 @@ while let Some(v) = stream.next().await { } ``` -We could also consdier adding a try_next? function, allowing +We could also consider adding a try_next? function, allowing a user to write: ```rust From 5a3de0fa4ea9efbffef252cb3aeecf402c51c39c Mon Sep 17 00:00:00 2001 From: Nell Shamrell-Harrington Date: Tue, 29 Sep 2020 10:32:12 -0700 Subject: [PATCH 04/47] Update text/0000-async-stream.md Co-authored-by: Waffle Lapkin --- text/0000-async-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index a1fc65b5f3a..5b45d41244d 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -89,7 +89,7 @@ The arguments to `poll_next` match that of the [`Future::poll`] method: [`Future::poll`]: https://doc.rust-lang.org/std/future/trait.Future.html#tymethod.poll [pinned]: https://doc.rust-lang.org/std/pin/struct.Pin.html [context]: https://doc.rust-lang.org/std/task/struct.Context.html -[Waker]: https://doc.rust-lang.org/std/task/struct.Waker.html +[`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html ### Why does next require Self:Unpin? From a145be9d03a99e023754d40990597e52c667ac54 Mon Sep 17 00:00:00 2001 From: Nell Shamrell-Harrington Date: Tue, 29 Sep 2020 16:33:42 -0700 Subject: [PATCH 05/47] Update text/0000-async-stream.md Co-authored-by: kennytm --- text/0000-async-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 5b45d41244d..d327920e76e 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -434,7 +434,7 @@ pub trait FromIterator { It should be noted that this trait is rarely used directly, instead used through Iterator's collect method ([source](https://doc.rust-lang.org/std/iter/trait.FromIterator.html)). ```rust -pub trait Interator { +pub trait Iterator { fn collect(self) -> B where B: FromIterator, From f65f0997029158810479637b10aa87a5dbbae05e Mon Sep 17 00:00:00 2001 From: Nell Shamrell-Harrington Date: Tue, 29 Sep 2020 16:33:52 -0700 Subject: [PATCH 06/47] Update text/0000-async-stream.md Co-authored-by: Taiki Endo --- text/0000-async-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index d327920e76e..9280021f887 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -106,7 +106,7 @@ Since `Stream::poll_next` takes a pinned reference, the next future needs `S` to An alternative approach we could take would be to have the `next` method take `Pin<&mut S>`, rather than `&mut S`. However, this would require pinning even when the type is `Unpin`. The current approach requires pinning only when the type is not `Unpin`. -At the moment, we do not see many `Unpin!` streams in practice (though there is one in the [futures-intrusive crate](https://github.com/Matthias247/futures-intrusive/blob/master/src/channel/mpmc.rs#L565-L625)). Where they will become important is when we introduce async generators, as discussed in [future-possibilities]. +At the moment, we do not see many `!Unpin` streams in practice (though there is one in the [futures-intrusive crate](https://github.com/Matthias247/futures-intrusive/blob/master/src/channel/mpmc.rs#L565-L625)). Where they will become important is when we introduce async generators, as discussed in [future-possibilities]. In summary, an async stream: * has a pinned receiver From 310b0b3662a2861bdd2c474468a402b2f3bf3d30 Mon Sep 17 00:00:00 2001 From: Nell Shamrell Date: Sat, 26 Sep 2020 00:51:48 -0700 Subject: [PATCH 07/47] remove boilerplate questions Signed-off-by: Nell Shamrell --- text/0000-async-stream.md | 4 ---- 1 file changed, 4 deletions(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 9280021f887..49b277b627b 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -284,10 +284,6 @@ Please also take into consideration that rust sometimes intentionally diverges f # Unresolved questions [unresolved-questions]: #unresolved-questions -- What parts of the design do you expect to resolve through the RFC process before this gets merged? -- What parts of the design do you expect to resolve through the implementation of this feature before stabilization? -- What related issues do you consider out of scope for this RFC that could be addressed in the future independently of the solution that comes out of this RFC? - # Future possibilities [future-possibilities]: #future-possibilities From e0d66d4aef4a91e6ccd44a0b68ce5016ad0e66d7 Mon Sep 17 00:00:00 2001 From: Nell Shamrell Date: Wed, 30 Sep 2020 22:06:57 -0700 Subject: [PATCH 08/47] adds more information about coverting an iterator into a stream Signed-off-by: Nell Shamrell --- text/0000-async-stream.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 49b277b627b..250aede55df 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -481,6 +481,14 @@ about whether to implement from_stream for all T where `T: FromIterator` as well current function, but doesn't have too. Therefore, many (if not all) existing impls of `FromIterator` would work for `FromStream` as well. While this would be a good point for a future discussion, it is not in the scope of this RFC. +## Converting an Iterator to a Stream + +If a user wishes to convert an Iterator to a Stream, they may not be able to use IntoStream because a blanked impl for Iterator would conflict with more specific impls they may wish to write. Having a function that takes an `impl Iterator` and returns an `impl Stream` would be quite helpful. + +The [async-std](https://github.com/async-rs/async-std) crate has [stream::from_iter](https://docs.rs/async-std/1.6.5/async_std/stream/fn.from_iter.html). The [futures-rs](https://github.com/rust-lang/futures-rs) crate has [stream::iter](https://docs.rs/futures/0.3.5/futures/stream/fn.iter.html). Either of these approaches could work once we expose `Stream` in the standard library. + +Adding this functionality is out of the scope of this RFC, but is something we should revisit once `Stream` is in the standard library. + ## Other Traits Eventually, we may also want to add some (if not all) of the roster of traits we found useful for `Iterator`. From 7b84d1cd276be5a048a8e82c35af6ef2a6736f41 Mon Sep 17 00:00:00 2001 From: Nell Shamrell Date: Wed, 30 Sep 2020 22:11:24 -0700 Subject: [PATCH 09/47] adds clarifying statement regarding !Unpin streams Signed-off-by: Nell Shamrell --- text/0000-async-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 250aede55df..bfe355d0966 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -106,7 +106,7 @@ Since `Stream::poll_next` takes a pinned reference, the next future needs `S` to An alternative approach we could take would be to have the `next` method take `Pin<&mut S>`, rather than `&mut S`. However, this would require pinning even when the type is `Unpin`. The current approach requires pinning only when the type is not `Unpin`. -At the moment, we do not see many `!Unpin` streams in practice (though there is one in the [futures-intrusive crate](https://github.com/Matthias247/futures-intrusive/blob/master/src/channel/mpmc.rs#L565-L625)). Where they will become important is when we introduce async generators, as discussed in [future-possibilities]. +We currently do see some `!Unpin` streams in practice (including in the [futures-intrusive crate](https://github.com/Matthias247/futures-intrusive/blob/master/src/channel/mpmc.rs#L565-L625)). We also see see `stream.then(|_| async {})` resulting in an `!Unpin` stream. Where `!Unpin` streams will become important is when we introduce async generators, as discussed in [future-possibilities]. In summary, an async stream: * has a pinned receiver From 512f41ae5b10467ff870f53714d12076322ac24b Mon Sep 17 00:00:00 2001 From: Nell Shamrell Date: Sat, 17 Oct 2020 14:58:20 -0700 Subject: [PATCH 10/47] corrects some typos Signed-off-by: Nell Shamrell --- text/0000-async-stream.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index bfe355d0966..647160c2e21 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -106,7 +106,7 @@ Since `Stream::poll_next` takes a pinned reference, the next future needs `S` to An alternative approach we could take would be to have the `next` method take `Pin<&mut S>`, rather than `&mut S`. However, this would require pinning even when the type is `Unpin`. The current approach requires pinning only when the type is not `Unpin`. -We currently do see some `!Unpin` streams in practice (including in the [futures-intrusive crate](https://github.com/Matthias247/futures-intrusive/blob/master/src/channel/mpmc.rs#L565-L625)). We also see see `stream.then(|_| async {})` resulting in an `!Unpin` stream. Where `!Unpin` streams will become important is when we introduce async generators, as discussed in [future-possibilities]. +We currently do see some `!Unpin` streams in practice (including in the [futures-intrusive crate](https://github.com/Matthias247/futures-intrusive/blob/master/src/channel/mpmc.rs#L565-L625)). We also see `stream.then(|_| async {})` resulting in an `!Unpin` stream. Where `!Unpin` streams will become important is when we introduce async generators, as discussed in [future-possibilities]. In summary, an async stream: * has a pinned receiver @@ -228,7 +228,7 @@ why they ended up the way they did. `Stream` will live in the `core::stream` module and be re-exported as `std::stream`. -It is possible that it could live in another area as well, though this followes +It is possible that it could live in another area as well, though this follows the pattern of `core::future`. ## Why use a `poll` method? @@ -250,7 +250,7 @@ resolved](https://rust-lang.github.io/wg-async-foundations/design_notes/async_fn before they can be added. Moreover, it is not clear yet how to make traits that contain async -functions be `dyn` safe, and it is imporant to be able to pass around `dyn +functions be `dyn` safe, and it is important to be able to pass around `dyn Stream` values without the need to monomorphize the functions that work with them. @@ -339,7 +339,7 @@ pub trait IntoIterator where } ``` -Examples taken from the Rust docs on [for loops and into_iter](https://doc.rust-lang.org/std/iter/index.html#for-loops-and-intoiterator) +Examples are taken from the Rust docs on [for loops and into_iter](https://doc.rust-lang.org/std/iter/index.html#for-loops-and-intoiterator) * `for x in iter` uses `impl IntoIterator for T` @@ -438,7 +438,7 @@ pub trait Iterator { } ``` -Examples taken from the Rust docs on [iter and collect](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.collect) +Examples are taken from the Rust docs on [iter and collect](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.collect) ```rust @@ -452,7 +452,7 @@ let doubled: Vec = a.iter() **Streams** -We may want a trait similar to this for `Stream`. The `FromStream` trait would provide way to convert a `Stream` into another type. +We may want a trait similar to this for `Stream`. The `FromStream` trait would provide a way to convert a `Stream` into another type. This trait could look like this: @@ -747,7 +747,7 @@ impl + DerefMut> Iterator for Pin

{ // this would be nice.. but would lead to name resolution ambiguity for our combinators 😬 default impl PinIterator for T { .. } ``` -Pinning also applies to the design of AsyncRead/AsyncWrite, which currently uses Pin even through there is no clear plan to make them implemented with generator type syntax. The asyncification of a signature is current understood as pinned receiver + context arg + return poll. +Pinning also applies to the design of AsyncRead/AsyncWrite, which currently uses Pin even through there is no clear plan to make them implemented with generator type syntax. The asyncification of a signature is currently understood as pinned receiver + context arg + return poll. ### Yielding From 0e44c56f818e4720bc3aae0be910ca576600bd7c Mon Sep 17 00:00:00 2001 From: Nell Shamrell Date: Sat, 14 Nov 2020 12:31:37 -0800 Subject: [PATCH 11/47] adds clarification around motivation and the guide level explanation Signed-off-by: Nell Shamrell --- text/0000-async-stream.md | 71 ++++++++++++++++++++++++++------------- 1 file changed, 48 insertions(+), 23 deletions(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 647160c2e21..3dc91cd18ec 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -13,18 +13,26 @@ design from `futures`. Redirect the `Stream` trait definition in the # Motivation [motivation]: #motivation -Streams are a core async abstraction. We want to enable portable libraries that -produce/consume streams without being tied to a particular executor. +Streams are a core async abstraction. These behave similarly to `Iterator`, +but rather than blocking between each item yield, it allows other +tasks to run while it waits. People can do this currently using the `Stream` trait defined in the -[futures](https://crates.io/crates/futures) crate. However, the -stability guarantee of that trait would be clearer if it were added -to the standard library. For example, if [Tokio](https://tokio.rs/) +[futures](https://crates.io/crates/futures) crate. However, we would like +to add `Stream` to the standard library. + +In addition to adding the `Stream` trait to the standard library, we also want to provide basic +ergonomic methods required to use streams effectively. These include the +`next` and `poll_next` methods. Without these methods, `Streams` would feel much more +difficult to use. Were we to not include them, users would likely immediately reach out +for them either through writing their own version or using an external crate. + +Including `Stream` in the standard library would also clarify the stability guarantees of the trait. For example, if [Tokio](https://tokio.rs/) wishes to declare a [5 year stability period](http://smallcultfollowing.com/babysteps/blog/2020/02/11/async-interview-6-eliza-weisman/#communicating-stability), -having the stream trait in the standard library means there are no concerns +having the `Stream` trait in the standard library means there are no concerns about the trait changing during that time ([citation](http://smallcultfollowing.com/babysteps/blog/2019/12/23/async-interview-3-carl-lerche/#what-should-we-do-next-stabilize-stream)). -## Examples of crates that are consuming streams +## Examples of current crates that are consuming streams ### async-h1 @@ -42,19 +50,23 @@ This includes a trait for producing streams and a trait for consuming streams. # Guide-level explanation [guide-level-explanation]: #guide-level-explanation -A "stream" is the async version of an iterator. The `Stream` trait -matches the definition of an [iterator], except that the `next` method -is defined to "poll" for the next item. In other words, where the -`next` method on an iterator simply computes (and returns) the next -item in the sequence, the `poll_next` method on stream asks if the -next item is ready. If so, it will be returned, but otherwise -`poll_next` will return [`Poll::pending`]. Just as with a [`Future`], -returning [`Poll::pending`] implies that the stream has arranged for -the current task to be re-awoken when the data is ready. +A "stream" is the async version of an [iterator]. + +## poll_next method + +The `Iterator` trait includes a `next` method, which computes and +returns the next item in the sequence. + +When implementing a `Stream`, users will define a `poll_next` method. +The `poll_next' method asks if the next item is ready. If so, it returns +the item. Otherwise, `poll_next` will return [`Poll::pending`]. + +Just as with a [`Future`], returning [`Poll::pending`] +implies that the stream has arranged for the current task to be re-awoken when the data is ready. [iterator]: https://doc.rust-lang.org/std/iter/trait.Iterator.html [`Future`]: https://doc.rust-lang.org/std/future/trait.Future.html -[`Poll::pending`]: https://doc.rust-lang.org/std/task/enum.Poll.html#variant.Pending +[`Poll::pending`]: https://doc.rust-lang.org ```rust // Defined in std::stream module @@ -69,7 +81,7 @@ pub trait Stream { (0, None) } - // Convenience methods (covered later on in the RFC): + // Convenience method (covered later on in the RFC): fn next(&mut self) -> Next<'_, Self> where Self: Unpin; @@ -91,6 +103,24 @@ The arguments to `poll_next` match that of the [`Future::poll`] method: [context]: https://doc.rust-lang.org/std/task/struct.Context.html [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html +## next method + +As @yoshuawuyts states in their [pull request which adds `core::stream::Stream` to the standard library]: + +Unlike `Iterator`, `Stream` makes a distinction between the [`poll_next`] +method which is used when implementing a `Stream`, and the [`next`] method +which is used when consuming a stream. Consumers of `Stream` only need to +consider [`next`], which when called, returns a future which yields +[`Option`]``. + +The future returned by [`next`] will yield `Some(Item)` as long as there are +elements, and once they've all been exhausted, will yield `None` to indicate +that iteration is finished. If we're waiting on something asynchronous to +resolve, the future will wait until the stream is ready to yield again. + +Individual streams may choose to resume iteration, and so calling [`next`] +again may or may not eventually yield `Some(Item)` again at some point. + ### Why does next require Self:Unpin? When drafting this RFC, there was a [good deal of discussion](https://github.com/rust-lang/wg-async-foundations/pull/15#discussion_r452482084) around why the `next` method requires `Self:Unpin`. @@ -108,11 +138,6 @@ An alternative approach we could take would be to have the `next` method take `P We currently do see some `!Unpin` streams in practice (including in the [futures-intrusive crate](https://github.com/Matthias247/futures-intrusive/blob/master/src/channel/mpmc.rs#L565-L625)). We also see `stream.then(|_| async {})` resulting in an `!Unpin` stream. Where `!Unpin` streams will become important is when we introduce async generators, as discussed in [future-possibilities]. -In summary, an async stream: -* has a pinned receiver -* that takes `cx` context -* and returns a `Poll` - ## Initial impls There are a number of simple "bridge" impls that are also provided: From c7ace1657db4b301847c453f9f584274eec08c1e Mon Sep 17 00:00:00 2001 From: Nell Shamrell-Harrington Date: Tue, 17 Nov 2020 17:26:07 -0800 Subject: [PATCH 12/47] Update text/0000-async-stream.md Co-authored-by: Taiki Endo --- text/0000-async-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 3dc91cd18ec..8ccc09c3297 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -66,7 +66,7 @@ implies that the stream has arranged for the current task to be re-awoken when t [iterator]: https://doc.rust-lang.org/std/iter/trait.Iterator.html [`Future`]: https://doc.rust-lang.org/std/future/trait.Future.html -[`Poll::pending`]: https://doc.rust-lang.org +[`Poll::Pending`]: https://doc.rust-lang.org/std/task/enum.Poll.html#variant.Pending ```rust // Defined in std::stream module From 1948fad0b661d2374aac9901df99834d13ffe62f Mon Sep 17 00:00:00 2001 From: Nell Shamrell-Harrington Date: Tue, 17 Nov 2020 17:26:27 -0800 Subject: [PATCH 13/47] Update text/0000-async-stream.md Co-authored-by: Taiki Endo --- text/0000-async-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 8ccc09c3297..2e2354e458a 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -105,7 +105,7 @@ The arguments to `poll_next` match that of the [`Future::poll`] method: ## next method -As @yoshuawuyts states in their [pull request which adds `core::stream::Stream` to the standard library]: +As @yoshuawuyts states in their [pull request which adds `core::stream::Stream` to the standard library](https://github.com/rust-lang/rust/pull/79023): Unlike `Iterator`, `Stream` makes a distinction between the [`poll_next`] method which is used when implementing a `Stream`, and the [`next`] method From 0ba1e677d7a3b3c352dc281fd846f6b527f68293 Mon Sep 17 00:00:00 2001 From: Nell Shamrell-Harrington Date: Tue, 17 Nov 2020 17:26:36 -0800 Subject: [PATCH 14/47] Update text/0000-async-stream.md Co-authored-by: Taiki Endo --- text/0000-async-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 2e2354e458a..a569ab72135 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -59,7 +59,7 @@ returns the next item in the sequence. When implementing a `Stream`, users will define a `poll_next` method. The `poll_next' method asks if the next item is ready. If so, it returns -the item. Otherwise, `poll_next` will return [`Poll::pending`]. +the item. Otherwise, `poll_next` will return [`Poll::Pending`]. Just as with a [`Future`], returning [`Poll::pending`] implies that the stream has arranged for the current task to be re-awoken when the data is ready. From 414665a2028a09f59fa336ab1080f2bcbd7e6c59 Mon Sep 17 00:00:00 2001 From: Nell Shamrell-Harrington Date: Tue, 17 Nov 2020 17:26:53 -0800 Subject: [PATCH 15/47] Update text/0000-async-stream.md Co-authored-by: Taiki Endo --- text/0000-async-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index a569ab72135..f1b0379db56 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -179,7 +179,7 @@ We should also implement a next method, similar to [the implementation in the fu In general, we have purposefully kept the core trait definition minimal. There are a number of useful extension methods that are available, for example, -in the futures-stream crate, but we have not included them because they involve +in the `futures-util` crate, but we have not included them because they involve closure arguments, and we have not yet finalized the design of async closures. However, the core methods alone are extremely unergonomic. You can't even iterate From fc92f81ef4e80fd93d6b087baffa0ce971776f64 Mon Sep 17 00:00:00 2001 From: Nell Shamrell-Harrington Date: Tue, 17 Nov 2020 17:27:04 -0800 Subject: [PATCH 16/47] Update text/0000-async-stream.md Co-authored-by: Taiki Endo --- text/0000-async-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index f1b0379db56..8eeb0702311 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -228,7 +228,7 @@ while let Some(v) = stream.next().await { } ``` -We could also consider adding a try_next? function, allowing +We could also consider adding a `try_next` method, allowing a user to write: ```rust From 6f6da399126981a3ee1fbaa1e0452196894d8dac Mon Sep 17 00:00:00 2001 From: Nell Shamrell Date: Tue, 17 Nov 2020 17:47:24 -0800 Subject: [PATCH 17/47] adds examples of how a user would interact with the poll_next and next methods Signed-off-by: Nell Shamrell --- text/0000-async-stream.md | 95 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 92 insertions(+), 3 deletions(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 8eeb0702311..30f9c637c23 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -50,12 +50,12 @@ This includes a trait for producing streams and a trait for consuming streams. # Guide-level explanation [guide-level-explanation]: #guide-level-explanation -A "stream" is the async version of an [iterator]. +A "stream" is the async version of an [iterator]. + +The `Iterator` trait includes a `next` method, which computes and returns the next item in the sequence. The `Stream` trait includes two core methods for defining and interacting with streams - `poll_next` and 'next`. ## poll_next method -The `Iterator` trait includes a `next` method, which computes and -returns the next item in the sequence. When implementing a `Stream`, users will define a `poll_next` method. The `poll_next' method asks if the next item is ready. If so, it returns @@ -103,6 +103,59 @@ The arguments to `poll_next` match that of the [`Future::poll`] method: [context]: https://doc.rust-lang.org/std/task/struct.Context.html [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html +### Usage + +A user could create a stream as follows (Example taken from @yoshuawuyt's [implementation pull request](https://github.com/rust-lang/rust/pull/79023)). + +Creating a stream involves two steps: creating a `struct` to + hold the stream's state, and then implementing [`Stream`] for that + `struct`. + + Let's make a stream named `Counter` which counts from `1` to `5`: + +```rust +#![feature(async_stream)] +# use core::stream::Stream; +# use core::task::{Context, Poll}; +# use core::pin::Pin; + +// First, the struct: + +/// A stream which counts from one to five +struct Counter { + count: usize, +} + +// we want our count to start at one, so let's add a new() method to help. +// This isn't strictly necessary, but is convenient. Note that we start +// `count` at zero, we'll see why in `poll_next()`'s implementation below. +impl Counter { + fn new() -> Counter { + Counter { count: 0 } + } +} + +// Then, we implement `Stream` for our `Counter`: + +impl Stream for Counter { + // we will be counting with usize + type Item = usize; + + // poll_next() is the only required method + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // Increment our count. This is why we started at zero. + self.count += 1; + + // Check to see if we've finished counting or not. + if self.count < 6 { + Poll::Ready(Some(self.count)) + } else { + Poll::Ready(None) + } + } +} +``` + ## next method As @yoshuawuyts states in their [pull request which adds `core::stream::Stream` to the standard library](https://github.com/rust-lang/rust/pull/79023): @@ -121,6 +174,42 @@ resolve, the future will wait until the stream is ready to yield again. Individual streams may choose to resume iteration, and so calling [`next`] again may or may not eventually yield `Some(Item)` again at some point. +This is similar to the `Future` trait. The `Future::poll_next` method is rarely called +directly, it is almost always used to implement other Futures. Interacting +with futures is done through `async/await`. + +A `Stream` by itself is not useful - we need some way to interact with it. +To interact with it, we need the `next` method - all other interactions +with `Stream` can be expressed through it. Without the `next` method, +streams cannot be consumed. + +### Usage + +Continuing the example of `Stream` implemented on a struct called `Counter`, the user would interact with the stream like so: + +```rust +let mut counter = Counter::new(); + +let x = counter.next().await.unwrap(); +println!("{}", x); + +let x = counter.next().await.unwrap(); +println!("{}", x); + +let x = counter.next().await.unwrap(); +println!("{}", x); + +let x = counter.next().await.unwrap(); +println!("{}", x); + +let x = counter.next().await.unwrap(); +println!("{}", x); +# +} +``` + +This would print `1` through `5`, each on their own line. + ### Why does next require Self:Unpin? When drafting this RFC, there was a [good deal of discussion](https://github.com/rust-lang/wg-async-foundations/pull/15#discussion_r452482084) around why the `next` method requires `Self:Unpin`. From c7af6827ab687dd2f075c5ce2749d8d1200bda3a Mon Sep 17 00:00:00 2001 From: Nell Shamrell-Harrington Date: Tue, 17 Nov 2020 17:47:50 -0800 Subject: [PATCH 18/47] Update text/0000-async-stream.md Co-authored-by: Taiki Endo --- text/0000-async-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 30f9c637c23..2ae64af7606 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -61,7 +61,7 @@ When implementing a `Stream`, users will define a `poll_next` method. The `poll_next' method asks if the next item is ready. If so, it returns the item. Otherwise, `poll_next` will return [`Poll::Pending`]. -Just as with a [`Future`], returning [`Poll::pending`] +Just as with a [`Future`], returning [`Poll::Pending`] implies that the stream has arranged for the current task to be re-awoken when the data is ready. [iterator]: https://doc.rust-lang.org/std/iter/trait.Iterator.html From a1b60e8b486aec4346cf8b5a9aa5da7294b40775 Mon Sep 17 00:00:00 2001 From: Nell Shamrell Date: Tue, 17 Nov 2020 18:12:31 -0800 Subject: [PATCH 19/47] adds in more discussion around pinning with async generators Signed-off-by: Nell Shamrell --- text/0000-async-stream.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 2ae64af7606..be29066ba54 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -214,6 +214,8 @@ This would print `1` through `5`, each on their own line. When drafting this RFC, there was a [good deal of discussion](https://github.com/rust-lang/wg-async-foundations/pull/15#discussion_r452482084) around why the `next` method requires `Self:Unpin`. +In particular, there was concern around how this would affect the use of generators in the future. Generators are discussed in the [Future possiblilities](future-possibilities) section later in this RFC. + To understand this, it helps to take a closer look at the definition of `Next` (this struct is further discussed later in this RFC) in the [futures-util crate](https://docs.rs/futures-util/0.3.5/src/futures_util/stream/stream/next.rs.html#10-12). ```rust @@ -804,7 +806,7 @@ to convert a `LendingStream` into a `Stream` seamlessly. ### Differences between Iterator generators and Async generators -We want `Stream` and `Iterator` to work as analogously as possible, including when used with generators. However, in the current design, there is a crucial difference between the two. +We want `Stream` and `Iterator` to work as analogously as possible, including when used with generators. However, in the current design, there are some crucial differences between the two. Consider Iterator's core `next` method: @@ -861,8 +863,13 @@ impl + DerefMut> Iterator for Pin

{ // this would be nice.. but would lead to name resolution ambiguity for our combinators 😬 default impl PinIterator for T { .. } ``` + Pinning also applies to the design of AsyncRead/AsyncWrite, which currently uses Pin even through there is no clear plan to make them implemented with generator type syntax. The asyncification of a signature is currently understood as pinned receiver + context arg + return poll. +Another key difference between `Iterators` and `Streams` is that that futures are ultimately passed to some executor API like spawn which expects a static future. To achieve that, the futures contain all the state they need and references are internal to that state. Iterators are almost never required to be 'static by the APIs that consume them. + +It is, admittedly, somewhat confusing to have Async generators require Pinning and Iterator generators to not require pinning, users may feel they are creating code in an unnatural way when using the Async generators. This will need to be discussed more when generators are proposed in the future. + ### Yielding It would be useful to be able to yield from inside a for loop, as long as the for loop is From 0b806efbd28f453f01d154b4d9c46babfc3f1a3e Mon Sep 17 00:00:00 2001 From: Nell Shamrell-Harrington Date: Tue, 17 Nov 2020 18:17:41 -0800 Subject: [PATCH 20/47] Update text/0000-async-stream.md Co-authored-by: Taiki Endo --- text/0000-async-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index be29066ba54..c228dff944f 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -58,7 +58,7 @@ The `Iterator` trait includes a `next` method, which computes and returns the ne When implementing a `Stream`, users will define a `poll_next` method. -The `poll_next' method asks if the next item is ready. If so, it returns +The `poll_next` method asks if the next item is ready. If so, it returns the item. Otherwise, `poll_next` will return [`Poll::Pending`]. Just as with a [`Future`], returning [`Poll::Pending`] From ece4f4cc06d23fcb1ca7168942dd5f0f99f3b051 Mon Sep 17 00:00:00 2001 From: Nell Shamrell Date: Tue, 1 Dec 2020 18:25:04 -0800 Subject: [PATCH 21/47] correct reference to Future::poll method Signed-off-by: Nell Shamrell --- text/0000-async-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index c228dff944f..7ef9f605634 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -174,7 +174,7 @@ resolve, the future will wait until the stream is ready to yield again. Individual streams may choose to resume iteration, and so calling [`next`] again may or may not eventually yield `Some(Item)` again at some point. -This is similar to the `Future` trait. The `Future::poll_next` method is rarely called +This is similar to the `Future` trait. The `Future::poll` method is rarely called directly, it is almost always used to implement other Futures. Interacting with futures is done through `async/await`. From 8b52cee53208a9053db6f178e2a1aa3f820d9e57 Mon Sep 17 00:00:00 2001 From: Nell Shamrell Date: Tue, 1 Dec 2020 18:30:59 -0800 Subject: [PATCH 22/47] combines both next method sections Signed-off-by: Nell Shamrell --- text/0000-async-stream.md | 141 +++++++++++++++++++------------------- 1 file changed, 70 insertions(+), 71 deletions(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 7ef9f605634..4788a507e4b 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -56,7 +56,6 @@ The `Iterator` trait includes a `next` method, which computes and returns the ne ## poll_next method - When implementing a `Stream`, users will define a `poll_next` method. The `poll_next` method asks if the next item is ready. If so, it returns the item. Otherwise, `poll_next` will return [`Poll::Pending`]. @@ -158,6 +157,17 @@ impl Stream for Counter { ## next method +We should also implement a next method, similar to [the implementation in the futures-util crate](https://docs.rs/futures-util/0.3.5/src/futures_util/stream/stream/next.rs.html#10-12). + +In general, we have purposefully kept the core trait definition minimal. +There are a number of useful extension methods that are available, for example, +in the `futures-util` crate, but we have not included them because they involve +closure arguments, and we have not yet finalized the design of async closures. + +However, the core methods alone are extremely unergonomic. You can't even iterate +over the items coming out of the stream. Therefore, we include a few minimal +convenience methods that are not dependent on any unstable features, such as `next`. + As @yoshuawuyts states in their [pull request which adds `core::stream::Stream` to the standard library](https://github.com/rust-lang/rust/pull/79023): Unlike `Iterator`, `Stream` makes a distinction between the [`poll_next`] @@ -183,6 +193,65 @@ To interact with it, we need the `next` method - all other interactions with `Stream` can be expressed through it. Without the `next` method, streams cannot be consumed. +```rust +/// A future that advances the stream and returns the next value. +/// +/// This `struct` is created by the [`next`] method on [`Stream`]. See its +/// documentation for more. +/// +/// [`next`]: trait.Stream.html#method.next +/// [`Stream`]: trait.Stream.html +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Next<'a, S: ?Sized> { + stream: &'a mut S, +} + +impl Unpin for Next<'_, St> {} + +impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> { + pub(super) fn new(stream: &'a mut St) -> Self { + Next { stream } + } +} + +impl Future for Next<'_, St> { + type Output = Option; + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll { + Pin::new(&mut *self.stream).poll_next(cx) + } +} +``` + +This would allow a user to await on a future: + +```rust +while let Some(v) = stream.next().await { + +} +``` + +We could also consider adding a `try_next` method, allowing +a user to write: + +```rust +while let Some(x) = s.try_next().await? +``` + +One thing to note, if a user is using an older version of `futures-util`, +they would experience ambiguity when trying to use the `next` method that +is added to the standard library (and redirected to from `futures-core`). + +This can be done as a non-breaking change, but would require everyone to +upgrade rustc. We will want to create a transition plan on what this +means for users and pick the timing carefully. + + + ### Usage Continuing the example of `Stream` implemented on a struct called `Counter`, the user would interact with the stream like so: @@ -264,76 +333,6 @@ where } ``` -## Next method/struct - -We should also implement a next method, similar to [the implementation in the futures-util crate](https://docs.rs/futures-util/0.3.5/src/futures_util/stream/stream/next.rs.html#10-12). - -In general, we have purposefully kept the core trait definition minimal. -There are a number of useful extension methods that are available, for example, -in the `futures-util` crate, but we have not included them because they involve -closure arguments, and we have not yet finalized the design of async closures. - -However, the core methods alone are extremely unergonomic. You can't even iterate -over the items coming out of the stream. Therefore, we include a few minimal -convenience methods that are not dependent on any unstable features, such as `next`. - -```rust -/// A future that advances the stream and returns the next value. -/// -/// This `struct` is created by the [`next`] method on [`Stream`]. See its -/// documentation for more. -/// -/// [`next`]: trait.Stream.html#method.next -/// [`Stream`]: trait.Stream.html -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Next<'a, S: ?Sized> { - stream: &'a mut S, -} - -impl Unpin for Next<'_, St> {} - -impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> { - pub(super) fn new(stream: &'a mut St) -> Self { - Next { stream } - } -} - -impl Future for Next<'_, St> { - type Output = Option; - - fn poll( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll { - Pin::new(&mut *self.stream).poll_next(cx) - } -} -``` - -This would allow a user to await on a future: - -```rust -while let Some(v) = stream.next().await { - -} -``` - -We could also consider adding a `try_next` method, allowing -a user to write: - -```rust -while let Some(x) = s.try_next().await? -``` - -One thing to note, if a user is using an older version of `futures-util`, -they would experience ambiguity when trying to use the `next` method that -is added to the standard library (and redirected to from `futures-core`). - -This can be done as a non-breaking change, but would require everyone to -upgrade rustc. We will want to create a transition plan on what this -means for users and pick the timing carefully. - # Reference-level explanation [reference-level-explanation]: #reference-level-explanation From 174e2186364d3a3b20284291c66df18436d603df Mon Sep 17 00:00:00 2001 From: Nell Shamrell Date: Tue, 1 Dec 2020 18:52:31 -0800 Subject: [PATCH 23/47] adds examples for addressing !Unpin streams Signed-off-by: Nell Shamrell --- text/0000-async-stream.md | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 4788a507e4b..1a85e13adc9 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -250,8 +250,6 @@ This can be done as a non-breaking change, but would require everyone to upgrade rustc. We will want to create a transition plan on what this means for users and pick the timing carefully. - - ### Usage Continuing the example of `Stream` implemented on a struct called `Counter`, the user would interact with the stream like so: @@ -285,7 +283,7 @@ When drafting this RFC, there was a [good deal of discussion](https://github.com In particular, there was concern around how this would affect the use of generators in the future. Generators are discussed in the [Future possiblilities](future-possibilities) section later in this RFC. -To understand this, it helps to take a closer look at the definition of `Next` (this struct is further discussed later in this RFC) in the [futures-util crate](https://docs.rs/futures-util/0.3.5/src/futures_util/stream/stream/next.rs.html#10-12). +To understand this, it helps to take a closer look at the definition of `Next` in the [futures-util crate](https://docs.rs/futures-util/0.3.5/src/futures_util/stream/stream/next.rs.html#10-12). ```rust pub struct Next<'a, St: ?Sized> { @@ -298,6 +296,23 @@ An alternative approach we could take would be to have the `next` method take `P We currently do see some `!Unpin` streams in practice (including in the [futures-intrusive crate](https://github.com/Matthias247/futures-intrusive/blob/master/src/channel/mpmc.rs#L565-L625)). We also see `stream.then(|_| async {})` resulting in an `!Unpin` stream. Where `!Unpin` streams will become important is when we introduce async generators, as discussed in [future-possibilities]. +This could potentially be addressed by using the [pin_mut!](https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.1/futures/macro.pin_mut.html) macro in the `futures` crate (it is also included in the [pin-utils](https://docs.rs/pin-utils/0.1.0/pin_utils/macro.pin_mut.html) crate) or by using [Box::pin](https://doc.rust-lang.org/std/boxed/struct.Box.html#method.pin) to pin the stream. + +```rust +let stream = non_unpin_stream(); +pin_mut!(stream); +while let Some(item) = stream.next().await { + ... +} +``` + +```rust +let stream = Box::pin(non_unpin_stream()); +while let Some(item) = stream.next().await { + ... +} +``` + ## Initial impls There are a number of simple "bridge" impls that are also provided: From 3fe320e7e3f5d451a5ca988cc20df320e196fc72 Mon Sep 17 00:00:00 2001 From: Nell Shamrell Date: Tue, 1 Dec 2020 19:00:40 -0800 Subject: [PATCH 24/47] add more clarification to try_next method Signed-off-by: Nell Shamrell --- text/0000-async-stream.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 1a85e13adc9..a0c4a90b91f 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -242,6 +242,12 @@ a user to write: while let Some(x) = s.try_next().await? ``` +But this could also be written as: + +```rust +while let Some(x) = s.next().await.transpose()? +``` + One thing to note, if a user is using an older version of `futures-util`, they would experience ambiguity when trying to use the `next` method that is added to the standard library (and redirected to from `futures-core`). From ec04b64901e58673de43577465974cdcc931cd3f Mon Sep 17 00:00:00 2001 From: Nell Shamrell-Harrington Date: Tue, 1 Dec 2020 19:53:15 -0800 Subject: [PATCH 25/47] Update text/0000-async-stream.md Co-authored-by: Tyler Mandry --- text/0000-async-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index a0c4a90b91f..27f6f44d76b 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -52,7 +52,7 @@ This includes a trait for producing streams and a trait for consuming streams. A "stream" is the async version of an [iterator]. -The `Iterator` trait includes a `next` method, which computes and returns the next item in the sequence. The `Stream` trait includes two core methods for defining and interacting with streams - `poll_next` and 'next`. +The `Iterator` trait includes a `next` method, which computes and returns the next item in the sequence. The `Stream` trait includes two core methods for defining and interacting with streams - `poll_next` and `next`. ## poll_next method From 0ed396382edb5922e3d2fdeee43ec3f391d9a882 Mon Sep 17 00:00:00 2001 From: Nell Shamrell-Harrington Date: Tue, 1 Dec 2020 19:53:26 -0800 Subject: [PATCH 26/47] Update text/0000-async-stream.md Co-authored-by: Tyler Mandry --- text/0000-async-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 27f6f44d76b..792a4c09d41 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -164,7 +164,7 @@ There are a number of useful extension methods that are available, for example, in the `futures-util` crate, but we have not included them because they involve closure arguments, and we have not yet finalized the design of async closures. -However, the core methods alone are extremely unergonomic. You can't even iterate +However, the core `poll_next` method alone is extremely unergonomic. You can't even iterate over the items coming out of the stream. Therefore, we include a few minimal convenience methods that are not dependent on any unstable features, such as `next`. From c54ca857dcd2a284f212f19804102d6bb2a60af1 Mon Sep 17 00:00:00 2001 From: Nell Shamrell-Harrington Date: Tue, 1 Dec 2020 19:53:41 -0800 Subject: [PATCH 27/47] Update text/0000-async-stream.md Co-authored-by: Tyler Mandry --- text/0000-async-stream.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 792a4c09d41..8d4c82fe22c 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -83,7 +83,8 @@ pub trait Stream { // Convenience method (covered later on in the RFC): fn next(&mut self) -> Next<'_, Self> where - Self: Unpin; + Self: Unpin + { .. } } ``` From a39d5c3a6a2f3a77f5b73677afae8e8dba2336c9 Mon Sep 17 00:00:00 2001 From: Nell Shamrell Date: Tue, 1 Dec 2020 19:59:16 -0800 Subject: [PATCH 28/47] addresses some suggestions from comments on the RFC Signed-off-by: Nell Shamrell --- text/0000-async-stream.md | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 8d4c82fe22c..ce1a2d47a9e 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -22,9 +22,8 @@ People can do this currently using the `Stream` trait defined in the to add `Stream` to the standard library. In addition to adding the `Stream` trait to the standard library, we also want to provide basic -ergonomic methods required to use streams effectively. These include the -`next` and `poll_next` methods. Without these methods, `Streams` would feel much more -difficult to use. Were we to not include them, users would likely immediately reach out +ergonomic methods required to use streams effectively. These includes the `next` method. Without this method, `Streams` would feel much more +difficult to use. Were we to not include it, users would likely immediately reach out for them either through writing their own version or using an external crate. Including `Stream` in the standard library would also clarify the stability guarantees of the trait. For example, if [Tokio](https://tokio.rs/) @@ -171,28 +170,25 @@ convenience methods that are not dependent on any unstable features, such as `ne As @yoshuawuyts states in their [pull request which adds `core::stream::Stream` to the standard library](https://github.com/rust-lang/rust/pull/79023): -Unlike `Iterator`, `Stream` makes a distinction between the [`poll_next`] -method which is used when implementing a `Stream`, and the [`next`] method +Unlike `Iterator`, `Stream` makes a distinction between the `poll_next` +method which is used when implementing a `Stream`, and the `next` method which is used when consuming a stream. Consumers of `Stream` only need to -consider [`next`], which when called, returns a future which yields -[`Option`]``. +consider `next`, which when called, returns a future which yields +`Option`. -The future returned by [`next`] will yield `Some(Item)` as long as there are +The future returned by `next` will yield `Some(Item)` as long as there are elements, and once they've all been exhausted, will yield `None` to indicate that iteration is finished. If we're waiting on something asynchronous to resolve, the future will wait until the stream is ready to yield again. -Individual streams may choose to resume iteration, and so calling [`next`] +Individual streams may choose to resume iteration, and so calling `next` again may or may not eventually yield `Some(Item)` again at some point. This is similar to the `Future` trait. The `Future::poll` method is rarely called directly, it is almost always used to implement other Futures. Interacting with futures is done through `async/await`. -A `Stream` by itself is not useful - we need some way to interact with it. -To interact with it, we need the `next` method - all other interactions -with `Stream` can be expressed through it. Without the `next` method, -streams cannot be consumed. +We need something like the `next()` method in order to iterate over the stream directly in an `async` block or function. It is essentially an adapter from `Stream` to `Future`. ```rust /// A future that advances the stream and returns the next value. From 0ee1505e3ba00bb8fdcf117a31f18948331b26e9 Mon Sep 17 00:00:00 2001 From: Nell Shamrell Date: Tue, 1 Dec 2020 20:09:33 -0800 Subject: [PATCH 29/47] clarify wording around resuming iteration once a Stream returns None Signed-off-by: Nell Shamrell --- text/0000-async-stream.md | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index ce1a2d47a9e..9e160559f98 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -102,6 +102,12 @@ The arguments to `poll_next` match that of the [`Future::poll`] method: [context]: https://doc.rust-lang.org/std/task/struct.Context.html [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html +As defined in the [`Future` docs](https://doc.rust-lang.org/stable/std/future/trait.Future.html): + +Once a future has completed (returned Ready from poll), calling its poll method again may panic, block forever, or cause other kinds of problems; the Future trait places no requirements on the effects of such a call. However, as the poll method is not marked unsafe, Rust's usual rules apply: calls must never cause undefined behavior (memory corruption, incorrect use of unsafe functions, or the like), regardless of the future's state. + +This RFC takes a similar approach to Streams. In general, once a Stream returns `None`, clients should not poll it again. However, individual streams may choose to resume iteration if the user wishes them to, in which case the stream may or may not eventually yield `Some(Item)` again at some point. + ### Usage A user could create a stream as follows (Example taken from @yoshuawuyt's [implementation pull request](https://github.com/rust-lang/rust/pull/79023)). @@ -181,8 +187,9 @@ elements, and once they've all been exhausted, will yield `None` to indicate that iteration is finished. If we're waiting on something asynchronous to resolve, the future will wait until the stream is ready to yield again. -Individual streams may choose to resume iteration, and so calling `next` -again may or may not eventually yield `Some(Item)` again at some point. +As defined in the [`Future` docs](https://doc.rust-lang.org/stable/std/future/trait.Future.html): + +Once a future has completed (returned Ready from poll), calling its poll method again may panic, block forever, or cause other kinds of problems; the Future trait places no requirements on the effects of such a call. However, as the poll method is not marked unsafe, Rust's usual rules apply: calls must never cause undefined behavior (memory corruption, incorrect use of unsafe functions, or the like), regardless of the future's state. This is similar to the `Future` trait. The `Future::poll` method is rarely called directly, it is almost always used to implement other Futures. Interacting From 1e4cc8c45755e8f494e87f5e6fb0adc12d5c5982 Mon Sep 17 00:00:00 2001 From: Nell Shamrell-Harrington Date: Wed, 2 Dec 2020 18:48:29 -0800 Subject: [PATCH 30/47] Update text/0000-async-stream.md Co-authored-by: Tyler Mandry --- text/0000-async-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 9e160559f98..23fc1786b7e 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -618,7 +618,7 @@ pub trait Stream { When drafting this RFC, there was [discussion](https://github.com/rust-lang/wg-async-foundations/pull/15#discussion_r451182595) about whether to implement from_stream for all T where `T: FromIterator` as well. `FromStream` is perhaps more general than `FromIterator` because the await point is allowed to suspend execution of the -current function, but doesn't have too. Therefore, many (if not all) existing impls of `FromIterator` would work +current function, but doesn't have to. Therefore, many (if not all) existing impls of `FromIterator` would work for `FromStream` as well. While this would be a good point for a future discussion, it is not in the scope of this RFC. ## Converting an Iterator to a Stream From ec1b422e8f9097d68bd71a4757a01bcfd96ee9f8 Mon Sep 17 00:00:00 2001 From: Nell Shamrell Date: Sat, 5 Dec 2020 19:27:36 -0800 Subject: [PATCH 31/47] removes unneeded template text Signed-off-by: Nell Shamrell --- text/0000-async-stream.md | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 23fc1786b7e..dc18ba51c82 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -397,11 +397,6 @@ with them. Unfortunately, the use of poll does mean that it is harder to write stream implementations. The long-term fix for this, discussed in the [Future possiblilities](future-possibilities) section, is dedicated [generator syntax]. -# Drawbacks -[drawbacks]: #drawbacks - -Why should we *not* do this? - # Rationale and alternatives [rationale-and-alternatives]: #rationale-and-alternatives @@ -410,20 +405,6 @@ Why should we *not* do this? As mentioned above, `core::stream` is analogous to `core::future`. But, do we want to find some other naming scheme that can scale up to other future additions, such as io traits or channels? -# Prior art -[prior-art]: #prior-art - -Discuss prior art, both the good and the bad, in relation to this proposal. - -This section is intended to encourage you as an author to think about the lessons from other languages, provide readers of your RFC with a fuller picture. -If there is no prior art, that is fine - your ideas are interesting to us whether they are brand new or if it is an adaptation from other languages. - -Note that while precedent set by other languages is some motivation, it does not on its own motivate an RFC. -Please also take into consideration that rust sometimes intentionally diverges from common language features. - -# Unresolved questions -[unresolved-questions]: #unresolved-questions - # Future possibilities [future-possibilities]: #future-possibilities From 7bf481dc13cfb056efe178b213b43647c13af76f Mon Sep 17 00:00:00 2001 From: Nell Shamrell-Harrington Date: Sat, 5 Dec 2020 19:29:13 -0800 Subject: [PATCH 32/47] Update text/0000-async-stream.md Co-authored-by: Tyler Mandry --- text/0000-async-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index dc18ba51c82..931dd9bdd53 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -257,7 +257,7 @@ they would experience ambiguity when trying to use the `next` method that is added to the standard library (and redirected to from `futures-core`). This can be done as a non-breaking change, but would require everyone to -upgrade rustc. We will want to create a transition plan on what this +upgrade `futures`. We will want to create a transition plan on what this means for users and pick the timing carefully. ### Usage From 8c424582d191c0ec9a9093c52f0c0b47cde43659 Mon Sep 17 00:00:00 2001 From: Nell Shamrell Date: Sat, 5 Dec 2020 19:30:36 -0800 Subject: [PATCH 33/47] add possiblity of custom compiler error message when requiring users to upgrade futures Signed-off-by: Nell Shamrell --- text/0000-async-stream.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 931dd9bdd53..46e497a548f 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -258,7 +258,8 @@ is added to the standard library (and redirected to from `futures-core`). This can be done as a non-breaking change, but would require everyone to upgrade `futures`. We will want to create a transition plan on what this -means for users and pick the timing carefully. +means for users and pick the timing carefully. We may be able to ease this +somewhat with a custom compiler error message. ### Usage From e0647fe23dc7e62651b740f0c1a05b3782a028f0 Mon Sep 17 00:00:00 2001 From: Nell Shamrell Date: Sat, 5 Dec 2020 19:32:24 -0800 Subject: [PATCH 34/47] adds note that lending streams depend on Generic Associated Types Signed-off-by: Nell Shamrell --- text/0000-async-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 46e497a548f..9efca567a7d 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -671,7 +671,7 @@ gets returned by `Stream` is "detached" from self. This means it can be stored and moved about independently from `self`. This RFC does not cover the addition of lending streams (streams as implemented through -this RFC are all non-lending streams). +this RFC are all non-lending streams). Lending streams depend on [Generic Associated Types](https://rust-lang.github.io/rfcs/1598-generic_associated_types.html), which are not (at the time of this RFC) stable. We can add the `Stream` trait to the standard library now and delay adding in this distinction between the two types of streams - lending and From 33673448b87611dc5273f6d7838b761d699b5da8 Mon Sep 17 00:00:00 2001 From: Nell Shamrell-Harrington Date: Sat, 5 Dec 2020 19:33:04 -0800 Subject: [PATCH 35/47] Update text/0000-async-stream.md Co-authored-by: Tyler Mandry --- text/0000-async-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 9efca567a7d..839b8c643c9 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -721,7 +721,7 @@ where This is a "conversion" trait such that anything which implements `Stream` can also implement `Lending Stream`. -This trait captures the case we re-use internal buffers. This would be less flexible for +This trait captures the case where we re-use internal buffers. This would be less flexible for consumers, but potentially more efficient. Types could implement the `LendingStream` where they need to re-use an internal buffer and `Stream` if they do not. There is room for both. From 418ccd6e0f404f7f9dd7bb21016583998c21af7f Mon Sep 17 00:00:00 2001 From: Nell Shamrell-Harrington Date: Sat, 5 Dec 2020 19:33:25 -0800 Subject: [PATCH 36/47] Update text/0000-async-stream.md Co-authored-by: Tyler Mandry --- text/0000-async-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 839b8c643c9..c2b4e81994b 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -719,7 +719,7 @@ where ``` This is a "conversion" trait such that anything which implements `Stream` can also implement -`Lending Stream`. +`LendingStream`. This trait captures the case where we re-use internal buffers. This would be less flexible for consumers, but potentially more efficient. Types could implement the `LendingStream` From 997d698392c1c3babcebde80751f3f2a9e31530e Mon Sep 17 00:00:00 2001 From: Nell Shamrell-Harrington Date: Sat, 5 Dec 2020 19:33:35 -0800 Subject: [PATCH 37/47] Update text/0000-async-stream.md Co-authored-by: Tyler Mandry --- text/0000-async-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index c2b4e81994b..51c15014a9f 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -806,7 +806,7 @@ After desugaring would result in a function like: fn foo() -> impl Stream ``` -If we introduce `-> Stream` first, we will have to permit `LendingStream` in the future. +If we introduce `-> impl Stream` first, we will have to permit `LendingStream` in the future. Additionally, if we introduce `LendingStream` later, we'll have to figure out how to convert a `LendingStream` into a `Stream` seamlessly. From b7d2cef8ac4a44d0c8478e354bd52c2931c6e84e Mon Sep 17 00:00:00 2001 From: Nell Shamrell-Harrington Date: Sat, 5 Dec 2020 19:33:46 -0800 Subject: [PATCH 38/47] Update text/0000-async-stream.md Co-authored-by: Tyler Mandry --- text/0000-async-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 51c15014a9f..515277ab486 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -872,7 +872,7 @@ default impl PinIterator for T { .. } Pinning also applies to the design of AsyncRead/AsyncWrite, which currently uses Pin even through there is no clear plan to make them implemented with generator type syntax. The asyncification of a signature is currently understood as pinned receiver + context arg + return poll. -Another key difference between `Iterators` and `Streams` is that that futures are ultimately passed to some executor API like spawn which expects a static future. To achieve that, the futures contain all the state they need and references are internal to that state. Iterators are almost never required to be 'static by the APIs that consume them. +Another key difference between `Iterators` and `Streams` is that futures are ultimately passed to some executor API like spawn which expects a `'static` future. To achieve that, the futures contain all the state they need and references are internal to that state. Iterators are almost never required to be `'static` by the APIs that consume them. It is, admittedly, somewhat confusing to have Async generators require Pinning and Iterator generators to not require pinning, users may feel they are creating code in an unnatural way when using the Async generators. This will need to be discussed more when generators are proposed in the future. From 88f1e1a82806224a682ec977b06226e6e583f0e6 Mon Sep 17 00:00:00 2001 From: Nell Shamrell-Harrington Date: Sat, 5 Dec 2020 19:34:10 -0800 Subject: [PATCH 39/47] Update text/0000-async-stream.md Co-authored-by: Jon Gjengset --- text/0000-async-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 515277ab486..bc00ef4ab25 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -170,7 +170,7 @@ There are a number of useful extension methods that are available, for example, in the `futures-util` crate, but we have not included them because they involve closure arguments, and we have not yet finalized the design of async closures. -However, the core `poll_next` method alone is extremely unergonomic. You can't even iterate +However, the core `poll_next` method is unergonomic; it does not let you iterate over the items coming out of the stream. Therefore, we include a few minimal convenience methods that are not dependent on any unstable features, such as `next`. From dad7612926bc5d3bdb1bd044234e7a9788f2f57d Mon Sep 17 00:00:00 2001 From: Nell Shamrell-Harrington Date: Sat, 5 Dec 2020 19:35:00 -0800 Subject: [PATCH 40/47] Update text/0000-async-stream.md Co-authored-by: Tyler Mandry --- text/0000-async-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index bc00ef4ab25..85e234a85d0 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -772,7 +772,7 @@ is out of the scope of this particular RFC. [generator syntax]: #generator-syntax In the future, we may wish to introduce a new form of function - -`gen fn` in iterators and `async gen` in async code that +`gen fn` in iterators and `async gen fn` in async code that can contain `yield` statements. Calling such a function would yield a `impl Iterator` or `impl Stream`, for sync and async respectively. Given an "attached" or "borrowed" stream, the generator From d8c8d00074be6ae6dc19a86ae72ab87ac554d22a Mon Sep 17 00:00:00 2001 From: Nell Shamrell-Harrington Date: Sat, 5 Dec 2020 19:36:45 -0800 Subject: [PATCH 41/47] Update text/0000-async-stream.md Co-authored-by: Tyler Mandry --- text/0000-async-stream.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 85e234a85d0..d06923524c8 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -776,9 +776,9 @@ In the future, we may wish to introduce a new form of function - can contain `yield` statements. Calling such a function would yield a `impl Iterator` or `impl Stream`, for sync and async respectively. Given an "attached" or "borrowed" stream, the generator -yield could return references to local variables. Given a "detached" -or "owned" stream, the generator yield could return things -that you own or things that were borrowed from your caller. +could yield references to local variables. Given a "detached" +or "owned" stream, the generator could yield owned values +or things that were borrowed from its caller. ### In Iterators From 1ae82769ba58be7074f4aeae5511b8a2ed1b7ca8 Mon Sep 17 00:00:00 2001 From: Nell Shamrell Date: Sat, 5 Dec 2020 19:38:26 -0800 Subject: [PATCH 42/47] add in draft of LendingStream trait Signed-off-by: Nell Shamrell --- text/0000-async-stream.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index d06923524c8..dde93b1abca 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -703,6 +703,15 @@ such as the ability to spawn off futures processing each item in parallel. ### Potential Lending Stream Trait ```rust +trait LendingStream<'s> { + type Item<'a> where 's: 'a; + + fn poll_next<'a>( + self: Pin<&'a mut Self>, + cx: &mut Context<'_>, + ) -> Poll>>; +} + impl LendingStream for S where S: Stream, From fef7748a00aa3f9aebfdc10479ca610ddf35f9b9 Mon Sep 17 00:00:00 2001 From: Nell Shamrell-Harrington Date: Tue, 8 Dec 2020 09:54:26 -0800 Subject: [PATCH 43/47] Update text/0000-async-stream.md Co-authored-by: Tyler Mandry --- text/0000-async-stream.md | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index dde93b1abca..17110953ac7 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -885,14 +885,19 @@ Another key difference between `Iterators` and `Streams` is that futures are ult It is, admittedly, somewhat confusing to have Async generators require Pinning and Iterator generators to not require pinning, users may feel they are creating code in an unnatural way when using the Async generators. This will need to be discussed more when generators are proposed in the future. -### Yielding +### Disallowing self-borrowing generators in `gen fn` -It would be useful to be able to yield from inside a for loop, as long as the for loop is -over a borrowed input and not something owned by the stack frame. +Another option is to make the generators returned by `gen fn` always be `Unpin` so that the user doesn't have to think about pinning unless they're already in an async context. -In the spirit of experimentation, boats has written the [propane](https://github.com/withoutboats/propane) +In the spirit of experimentation, boats has written the [propane] crate. This crate includes a `#[propane] fn` that changes the function signature to return `impl Iterator` and lets you `yield`. The non-async version uses -`static generator` that is currently in nightly only. +(nightly-only) generators which are non-`static`, disallowing self-borrowing. +In other words, you can't hold a reference to something on the stack across a `yield`. + +This should still allow yielding from inside a for loop, as long as the for loop is +over a borrowed input and not something owned by the stack frame. + +[propane]: https://github.com/withoutboats/propane Further designing generator functions is out of the scope of this RFC. From 881b162db82836d60d94034bc21f85227e95ae79 Mon Sep 17 00:00:00 2001 From: Nell Shamrell-Harrington Date: Wed, 9 Dec 2020 07:37:46 -0800 Subject: [PATCH 44/47] Update text/0000-async-stream.md Co-authored-by: Tyler Mandry --- text/0000-async-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 17110953ac7..85492977917 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -346,7 +346,7 @@ where impl Stream for Pin

where P: DerefMut + Unpin, - T::Target: Stream, + T: Stream, { type Item = ::Item; } From 1e2d66937acc05a24bd36df4c3b2cca2d2a51941 Mon Sep 17 00:00:00 2001 From: Nell Shamrell Date: Mon, 14 Dec 2020 18:35:48 -0800 Subject: [PATCH 45/47] revert wording around streams resuming iteration Signed-off-by: Nell Shamrell --- text/0000-async-stream.md | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 85492977917..25ece6ab77a 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -102,11 +102,8 @@ The arguments to `poll_next` match that of the [`Future::poll`] method: [context]: https://doc.rust-lang.org/std/task/struct.Context.html [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html -As defined in the [`Future` docs](https://doc.rust-lang.org/stable/std/future/trait.Future.html): - -Once a future has completed (returned Ready from poll), calling its poll method again may panic, block forever, or cause other kinds of problems; the Future trait places no requirements on the effects of such a call. However, as the poll method is not marked unsafe, Rust's usual rules apply: calls must never cause undefined behavior (memory corruption, incorrect use of unsafe functions, or the like), regardless of the future's state. - -This RFC takes a similar approach to Streams. In general, once a Stream returns `None`, clients should not poll it again. However, individual streams may choose to resume iteration if the user wishes them to, in which case the stream may or may not eventually yield `Some(Item)` again at some point. +Individual streams may choose to resume iteration, and so calling [`next`] +again may or may not eventually yield `Some(Item)` again at some point. ### Usage From d749f416d79a07cea6c4970bba4ff2919a40a0f0 Mon Sep 17 00:00:00 2001 From: Nell Shamrell Date: Mon, 14 Dec 2020 18:40:53 -0800 Subject: [PATCH 46/47] removes unnecessary link formatting Signed-off-by: Nell Shamrell --- text/0000-async-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 25ece6ab77a..5832710e15e 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -110,7 +110,7 @@ again may or may not eventually yield `Some(Item)` again at some point. A user could create a stream as follows (Example taken from @yoshuawuyt's [implementation pull request](https://github.com/rust-lang/rust/pull/79023)). Creating a stream involves two steps: creating a `struct` to - hold the stream's state, and then implementing [`Stream`] for that + hold the stream's state, and then implementing `Stream` for that `struct`. Let's make a stream named `Counter` which counts from `1` to `5`: From de606aaf5f7679d0eb88ae97ba6ccdd3d6fb767d Mon Sep 17 00:00:00 2001 From: Nell Shamrell Date: Sat, 9 Jan 2021 16:49:43 -0800 Subject: [PATCH 47/47] removes the next method from this RFC and moves it to the future possiblities section Signed-off-by: Nell Shamrell --- text/0000-async-stream.md | 282 ++++++++++++-------------------------- 1 file changed, 87 insertions(+), 195 deletions(-) diff --git a/text/0000-async-stream.md b/text/0000-async-stream.md index 5832710e15e..41c5d1a8690 100644 --- a/text/0000-async-stream.md +++ b/text/0000-async-stream.md @@ -21,12 +21,7 @@ People can do this currently using the `Stream` trait defined in the [futures](https://crates.io/crates/futures) crate. However, we would like to add `Stream` to the standard library. -In addition to adding the `Stream` trait to the standard library, we also want to provide basic -ergonomic methods required to use streams effectively. These includes the `next` method. Without this method, `Streams` would feel much more -difficult to use. Were we to not include it, users would likely immediately reach out -for them either through writing their own version or using an external crate. - -Including `Stream` in the standard library would also clarify the stability guarantees of the trait. For example, if [Tokio](https://tokio.rs/) +Including `Stream` in the standard library would clarify the stability guarantees of the trait. For example, if [Tokio](https://tokio.rs/) wishes to declare a [5 year stability period](http://smallcultfollowing.com/babysteps/blog/2020/02/11/async-interview-6-eliza-weisman/#communicating-stability), having the `Stream` trait in the standard library means there are no concerns about the trait changing during that time ([citation](http://smallcultfollowing.com/babysteps/blog/2019/12/23/async-interview-3-carl-lerche/#what-should-we-do-next-stabilize-stream)). @@ -51,7 +46,7 @@ This includes a trait for producing streams and a trait for consuming streams. A "stream" is the async version of an [iterator]. -The `Iterator` trait includes a `next` method, which computes and returns the next item in the sequence. The `Stream` trait includes two core methods for defining and interacting with streams - `poll_next` and `next`. +The `Iterator` trait includes a `next` method, which computes and returns the next item in the sequence. The `Stream` trait includes the `poll_next` method to assist with defining a stream. In the future, we should add a `next` method for use when consuming and interacting with a stream (see the [Future possiblilities](future-possibilities) section later in this RFC). ## poll_next method @@ -78,12 +73,6 @@ pub trait Stream { fn size_hint(&self) -> (usize, Option) { (0, None) } - - // Convenience method (covered later on in the RFC): - fn next(&mut self) -> Next<'_, Self> - where - Self: Unpin - { .. } } ``` @@ -102,9 +91,6 @@ The arguments to `poll_next` match that of the [`Future::poll`] method: [context]: https://doc.rust-lang.org/std/task/struct.Context.html [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html -Individual streams may choose to resume iteration, and so calling [`next`] -again may or may not eventually yield `Some(Item)` again at some point. - ### Usage A user could create a stream as follows (Example taken from @yoshuawuyt's [implementation pull request](https://github.com/rust-lang/rust/pull/79023)). @@ -158,169 +144,6 @@ impl Stream for Counter { } ``` -## next method - -We should also implement a next method, similar to [the implementation in the futures-util crate](https://docs.rs/futures-util/0.3.5/src/futures_util/stream/stream/next.rs.html#10-12). - -In general, we have purposefully kept the core trait definition minimal. -There are a number of useful extension methods that are available, for example, -in the `futures-util` crate, but we have not included them because they involve -closure arguments, and we have not yet finalized the design of async closures. - -However, the core `poll_next` method is unergonomic; it does not let you iterate -over the items coming out of the stream. Therefore, we include a few minimal -convenience methods that are not dependent on any unstable features, such as `next`. - -As @yoshuawuyts states in their [pull request which adds `core::stream::Stream` to the standard library](https://github.com/rust-lang/rust/pull/79023): - -Unlike `Iterator`, `Stream` makes a distinction between the `poll_next` -method which is used when implementing a `Stream`, and the `next` method -which is used when consuming a stream. Consumers of `Stream` only need to -consider `next`, which when called, returns a future which yields -`Option`. - -The future returned by `next` will yield `Some(Item)` as long as there are -elements, and once they've all been exhausted, will yield `None` to indicate -that iteration is finished. If we're waiting on something asynchronous to -resolve, the future will wait until the stream is ready to yield again. - -As defined in the [`Future` docs](https://doc.rust-lang.org/stable/std/future/trait.Future.html): - -Once a future has completed (returned Ready from poll), calling its poll method again may panic, block forever, or cause other kinds of problems; the Future trait places no requirements on the effects of such a call. However, as the poll method is not marked unsafe, Rust's usual rules apply: calls must never cause undefined behavior (memory corruption, incorrect use of unsafe functions, or the like), regardless of the future's state. - -This is similar to the `Future` trait. The `Future::poll` method is rarely called -directly, it is almost always used to implement other Futures. Interacting -with futures is done through `async/await`. - -We need something like the `next()` method in order to iterate over the stream directly in an `async` block or function. It is essentially an adapter from `Stream` to `Future`. - -```rust -/// A future that advances the stream and returns the next value. -/// -/// This `struct` is created by the [`next`] method on [`Stream`]. See its -/// documentation for more. -/// -/// [`next`]: trait.Stream.html#method.next -/// [`Stream`]: trait.Stream.html -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Next<'a, S: ?Sized> { - stream: &'a mut S, -} - -impl Unpin for Next<'_, St> {} - -impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> { - pub(super) fn new(stream: &'a mut St) -> Self { - Next { stream } - } -} - -impl Future for Next<'_, St> { - type Output = Option; - - fn poll( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll { - Pin::new(&mut *self.stream).poll_next(cx) - } -} -``` - -This would allow a user to await on a future: - -```rust -while let Some(v) = stream.next().await { - -} -``` - -We could also consider adding a `try_next` method, allowing -a user to write: - -```rust -while let Some(x) = s.try_next().await? -``` - -But this could also be written as: - -```rust -while let Some(x) = s.next().await.transpose()? -``` - -One thing to note, if a user is using an older version of `futures-util`, -they would experience ambiguity when trying to use the `next` method that -is added to the standard library (and redirected to from `futures-core`). - -This can be done as a non-breaking change, but would require everyone to -upgrade `futures`. We will want to create a transition plan on what this -means for users and pick the timing carefully. We may be able to ease this -somewhat with a custom compiler error message. - -### Usage - -Continuing the example of `Stream` implemented on a struct called `Counter`, the user would interact with the stream like so: - -```rust -let mut counter = Counter::new(); - -let x = counter.next().await.unwrap(); -println!("{}", x); - -let x = counter.next().await.unwrap(); -println!("{}", x); - -let x = counter.next().await.unwrap(); -println!("{}", x); - -let x = counter.next().await.unwrap(); -println!("{}", x); - -let x = counter.next().await.unwrap(); -println!("{}", x); -# -} -``` - -This would print `1` through `5`, each on their own line. - -### Why does next require Self:Unpin? - -When drafting this RFC, there was a [good deal of discussion](https://github.com/rust-lang/wg-async-foundations/pull/15#discussion_r452482084) around why the `next` method requires `Self:Unpin`. - -In particular, there was concern around how this would affect the use of generators in the future. Generators are discussed in the [Future possiblilities](future-possibilities) section later in this RFC. - -To understand this, it helps to take a closer look at the definition of `Next` in the [futures-util crate](https://docs.rs/futures-util/0.3.5/src/futures_util/stream/stream/next.rs.html#10-12). - -```rust -pub struct Next<'a, St: ?Sized> { - stream: &'a mut St, -} -``` -Since `Stream::poll_next` takes a pinned reference, the next future needs `S` to be `Unpin` in order to safely construct a `Pin<&mut S>` from a `&mut S`. - -An alternative approach we could take would be to have the `next` method take `Pin<&mut S>`, rather than `&mut S`. However, this would require pinning even when the type is `Unpin`. The current approach requires pinning only when the type is not `Unpin`. - -We currently do see some `!Unpin` streams in practice (including in the [futures-intrusive crate](https://github.com/Matthias247/futures-intrusive/blob/master/src/channel/mpmc.rs#L565-L625)). We also see `stream.then(|_| async {})` resulting in an `!Unpin` stream. Where `!Unpin` streams will become important is when we introduce async generators, as discussed in [future-possibilities]. - -This could potentially be addressed by using the [pin_mut!](https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.1/futures/macro.pin_mut.html) macro in the `futures` crate (it is also included in the [pin-utils](https://docs.rs/pin-utils/0.1.0/pin_utils/macro.pin_mut.html) crate) or by using [Box::pin](https://doc.rust-lang.org/std/boxed/struct.Box.html#method.pin) to pin the stream. - -```rust -let stream = non_unpin_stream(); -pin_mut!(stream); -while let Some(item) = stream.next().await { - ... -} -``` - -```rust -let stream = Box::pin(non_unpin_stream()); -while let Some(item) = stream.next().await { - ... -} -``` - ## Initial impls There are a number of simple "bridge" impls that are also provided: @@ -406,9 +229,92 @@ some other naming scheme that can scale up to other future additions, such as io # Future possibilities [future-possibilities]: #future-possibilities -## Convenience methods +## Next method + +While users will be able to implement a `Stream` as defined in this RFC, they will not have a way to interact with it in the core library. As soon as we figure out a way to do it in an object safe manner, we should add a `next` method either in the `Stream` trait or elsewhere. + +The `Iterator` trait includes a `next` method, which computes and returns the next item in the sequence. We should also implement a `next` method for `Stream`, similar to [the implementation in the futures-util crate](https://docs.rs/futures-util/0.3.5/src/futures_util/stream/stream/next.rs.html#10-12). + +The core `poll_next` method is unergonomic; it does not let you iterate +over the items coming out of the stream. Therefore, we include a few minimal +convenience methods that are not dependent on any unstable features, such as `next`. + +As @yoshuawuyts states in their [pull request which adds `core::stream::Stream` to the standard library](https://github.com/rust-lang/rust/pull/79023): + +Unlike `Iterator`, `Stream` makes a distinction between the `poll_next` +method which is used when implementing a `Stream`, and the `next` method +which is used when consuming a stream. Consumers of `Stream` only need to +consider `next`, which when called, returns a future which yields +`Option`. + +The future returned by `next` will yield `Some(Item)` as long as there are +elements, and once they've all been exhausted, will yield `None` to indicate +that iteration is finished. If we're waiting on something asynchronous to +resolve, the future will wait until the stream is ready to yield again. + +As defined in the [`Future` docs](https://doc.rust-lang.org/stable/std/future/trait.Future.html): -The `Iterator` trait defines a number of useful combinators, like +Once a future has completed (returned Ready from poll), calling its poll method again may panic, block forever, or cause other kinds of problems; the Future trait places no requirements on the effects of such a call. However, as the poll method is not marked unsafe, Rust's usual rules apply: calls must never cause undefined behavior (memory corruption, incorrect use of unsafe functions, or the like), regardless of the future's state. + +This is similar to the `Future` trait. The `Future::poll` method is rarely called +directly, it is almost always used to implement other Futures. Interacting +with futures is done through `async/await`. + +We need something like the `next()` method in order to iterate over the stream directly in an `async` block or function. It is essentially an adapter from `Stream` to `Future`. + +This would allow a user to await on a future: + +```rust +while let Some(v) = stream.next().await { + +} +``` + +We could also consider adding a `try_next` method, allowing +a user to write: + +```rust +while let Some(x) = s.try_next().await? +``` + +But this could also be written as: + +```rust +while let Some(x) = s.next().await.transpose()? +``` + +### More Usage Examples + +Using the example of `Stream` implemented on a struct called `Counter`, the user would interact with the stream like so: + +```rust +let mut counter = Counter::new(); + +let x = counter.next().await.unwrap(); +println!("{}", x); + +let x = counter.next().await.unwrap(); +println!("{}", x); + +let x = counter.next().await.unwrap(); +println!("{}", x); + +let x = counter.next().await.unwrap(); +println!("{}", x); + +let x = counter.next().await.unwrap(); +println!("{}", x); +# +} +``` + +This would print `1` through `5`, each on their own line. + +An earlier draft of the RFC prescribed an implementation of the `next` method on the `Stream` trait. Unfortunately, as detailed in [this comment](https://github.com/rust-lang/rust/pull/79023#discussion_r547425181), it made the stream non-object safe. More experimentation is required - and it may need to be an unstable language feature for more testing before it can be added to core. + +## More Convenience methods + +The `Iterator` trait also defines a number of useful combinators, like `map`. The `Stream` trait being proposed here does not include any such conveniences. Instead, they are available via extension traits, such as the [`StreamExt`] trait offered by the [`futures`] crate. @@ -829,17 +735,6 @@ pub trait Iterator { fn next(&mut self) -> Option; } ``` -And then compare it to the proposed Stream `next` method: - -```rust -pub trait Stream { - type Item; - - fn next(&mut self) -> Next<'_, Self> - where - Self: Unpin; -} -``` Iterator does not require pinning its core next method. In order for a `gen fn` to operate with the Iterator ecosystem, there must be some kind of initial pinning step that converts its result into an iterator. This will be tricky, since you can't return a pinned value except by boxing. @@ -864,11 +759,8 @@ For example: ```rust trait PinIterator { type Item; - fn next(self: Pin<&mut Self>) -> Self::Item; - // combinators can go here (duplicating Iterator for the most part) } impl + DerefMut> Iterator for Pin

{ - type Item = ::Item; fn next(&mut self) -> Self::Item { self.as_mut().next() } }