Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit e1fb7e7

Browse files
committedJul 31, 2017
Implement stream::poll_fn
1 parent 3d4f5ae commit e1fb7e7

File tree

2 files changed

+152
-67
lines changed

2 files changed

+152
-67
lines changed
 

‎src/stream/mod.rs

+103-67
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ mod merge;
4343
mod once;
4444
mod or_else;
4545
mod peek;
46+
mod poll_fn;
4647
mod select;
4748
mod skip;
4849
mod skip_while;
@@ -70,6 +71,7 @@ pub use self::merge::{Merge, MergedItem};
7071
pub use self::once::{Once, once};
7172
pub use self::or_else::OrElse;
7273
pub use self::peek::Peekable;
74+
pub use self::poll_fn::{PollFn, poll_fn};
7375
pub use self::select::Select;
7476
pub use self::skip::Skip;
7577
pub use self::skip_while::SkipWhile;
@@ -79,7 +81,7 @@ pub use self::then::Then;
7981
pub use self::unfold::{Unfold, unfold};
8082
pub use self::zip::Zip;
8183
pub use self::forward::Forward;
82-
use sink::{Sink};
84+
use sink::Sink;
8385

8486
if_std! {
8587
use std;
@@ -223,7 +225,8 @@ pub trait Stream {
223225
/// function panics, panics will be propagated to the caller of `next`.
224226
#[cfg(feature = "use_std")]
225227
fn wait(self) -> Wait<Self>
226-
where Self: Sized
228+
where
229+
Self: Sized,
227230
{
228231
wait::new(self)
229232
}
@@ -250,7 +253,8 @@ pub trait Stream {
250253
/// ```
251254
#[cfg(feature = "use_std")]
252255
fn boxed(self) -> BoxStream<Self::Item, Self::Error>
253-
where Self: Sized + Send + 'static,
256+
where
257+
Self: Sized + Send + 'static,
254258
{
255259
::std::boxed::Box::new(self)
256260
}
@@ -265,7 +269,8 @@ pub trait Stream {
265269
/// The returned future can be used to compose streams and futures together by
266270
/// placing everything into the "world of futures".
267271
fn into_future(self) -> StreamFuture<Self>
268-
where Self: Sized
272+
where
273+
Self: Sized,
269274
{
270275
future::new(self)
271276
}
@@ -290,8 +295,9 @@ pub trait Stream {
290295
/// let rx = rx.map(|x| x + 3);
291296
/// ```
292297
fn map<U, F>(self, f: F) -> Map<Self, F>
293-
where F: FnMut(Self::Item) -> U,
294-
Self: Sized
298+
where
299+
F: FnMut(Self::Item) -> U,
300+
Self: Sized,
295301
{
296302
map::new(self, f)
297303
}
@@ -316,8 +322,9 @@ pub trait Stream {
316322
/// let rx = rx.map_err(|()| 3);
317323
/// ```
318324
fn map_err<U, F>(self, f: F) -> MapErr<Self, F>
319-
where F: FnMut(Self::Error) -> U,
320-
Self: Sized
325+
where
326+
F: FnMut(Self::Error) -> U,
327+
Self: Sized,
321328
{
322329
map_err::new(self, f)
323330
}
@@ -346,8 +353,9 @@ pub trait Stream {
346353
/// let evens = rx.filter(|x| x % 0 == 2);
347354
/// ```
348355
fn filter<F>(self, f: F) -> Filter<Self, F>
349-
where F: FnMut(&Self::Item) -> bool,
350-
Self: Sized
356+
where
357+
F: FnMut(&Self::Item) -> bool,
358+
Self: Sized,
351359
{
352360
filter::new(self, f)
353361
}
@@ -382,8 +390,9 @@ pub trait Stream {
382390
/// });
383391
/// ```
384392
fn filter_map<F, B>(self, f: F) -> FilterMap<Self, F>
385-
where F: FnMut(Self::Item) -> Option<B>,
386-
Self: Sized
393+
where
394+
F: FnMut(Self::Item) -> Option<B>,
395+
Self: Sized,
387396
{
388397
filter_map::new(self, f)
389398
}
@@ -421,9 +430,10 @@ pub trait Stream {
421430
/// });
422431
/// ```
423432
fn then<F, U>(self, f: F) -> Then<Self, F, U>
424-
where F: FnMut(Result<Self::Item, Self::Error>) -> U,
425-
U: IntoFuture,
426-
Self: Sized
433+
where
434+
F: FnMut(Result<Self::Item, Self::Error>) -> U,
435+
U: IntoFuture,
436+
Self: Sized,
427437
{
428438
then::new(self, f)
429439
}
@@ -465,9 +475,10 @@ pub trait Stream {
465475
/// });
466476
/// ```
467477
fn and_then<F, U>(self, f: F) -> AndThen<Self, F, U>
468-
where F: FnMut(Self::Item) -> U,
469-
U: IntoFuture<Error = Self::Error>,
470-
Self: Sized
478+
where
479+
F: FnMut(Self::Item) -> U,
480+
U: IntoFuture<Error = Self::Error>,
481+
Self: Sized,
471482
{
472483
and_then::new(self, f)
473484
}
@@ -492,9 +503,10 @@ pub trait Stream {
492503
/// Note that this function consumes the receiving stream and returns a
493504
/// wrapped version of it.
494505
fn or_else<F, U>(self, f: F) -> OrElse<Self, F, U>
495-
where F: FnMut(Self::Error) -> U,
496-
U: IntoFuture<Item = Self::Item>,
497-
Self: Sized
506+
where
507+
F: FnMut(Self::Error) -> U,
508+
U: IntoFuture<Item = Self::Item>,
509+
Self: Sized,
498510
{
499511
or_else::new(self, f)
500512
}
@@ -533,7 +545,8 @@ pub trait Stream {
533545
/// ```
534546
#[cfg(feature = "use_std")]
535547
fn collect(self) -> Collect<Self>
536-
where Self: Sized
548+
where
549+
Self: Sized,
537550
{
538551
collect::new(self)
539552
}
@@ -569,8 +582,9 @@ pub trait Stream {
569582
/// assert_eq!(result.wait(), Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3]));
570583
/// ```
571584
fn concat2(self) -> Concat2<Self>
572-
where Self: Sized,
573-
Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,
585+
where
586+
Self: Sized,
587+
Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,
574588
{
575589
concat::new2(self)
576590
}
@@ -606,10 +620,11 @@ pub trait Stream {
606620
///
607621
/// It's important to note that this function will panic if the stream
608622
/// is empty, which is the reason for its deprecation.
609-
#[deprecated(since="0.1.14", note="please use `Stream::concat2` instead")]
623+
#[deprecated(since = "0.1.14", note = "please use `Stream::concat2` instead")]
610624
fn concat(self) -> Concat<Self>
611-
where Self: Sized,
612-
Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator,
625+
where
626+
Self: Sized,
627+
Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator,
613628
{
614629
concat::new(self)
615630
}
@@ -637,10 +652,11 @@ pub trait Stream {
637652
/// assert_eq!(sum.wait(), Ok(15));
638653
/// ```
639654
fn fold<F, T, Fut>(self, init: T, f: F) -> Fold<Self, F, Fut, T>
640-
where F: FnMut(T, Self::Item) -> Fut,
641-
Fut: IntoFuture<Item = T>,
642-
Self::Error: From<Fut::Error>,
643-
Self: Sized
655+
where
656+
F: FnMut(T, Self::Item) -> Fut,
657+
Fut: IntoFuture<Item = T>,
658+
Self::Error: From<Fut::Error>,
659+
Self: Sized,
644660
{
645661
fold::new(self, f, init)
646662
}
@@ -679,9 +695,10 @@ pub trait Stream {
679695
/// assert_eq!(result.wait(), Ok(vec![1, 2, 3, 4]));
680696
/// ```
681697
fn flatten(self) -> Flatten<Self>
682-
where Self::Item: Stream,
683-
<Self::Item as Stream>::Error: From<Self::Error>,
684-
Self: Sized
698+
where
699+
Self::Item: Stream,
700+
<Self::Item as Stream>::Error: From<Self::Error>,
701+
Self: Sized,
685702
{
686703
flatten::new(self)
687704
}
@@ -694,9 +711,10 @@ pub trait Stream {
694711
/// returns false all future elements will be returned from the underlying
695712
/// stream.
696713
fn skip_while<P, R>(self, pred: P) -> SkipWhile<Self, P, R>
697-
where P: FnMut(&Self::Item) -> R,
698-
R: IntoFuture<Item=bool, Error=Self::Error>,
699-
Self: Sized
714+
where
715+
P: FnMut(&Self::Item) -> R,
716+
R: IntoFuture<Item = bool, Error = Self::Error>,
717+
Self: Sized,
700718
{
701719
skip_while::new(self, pred)
702720
}
@@ -708,9 +726,10 @@ pub trait Stream {
708726
/// stream until the `predicate` resolves to `false`. Once one element
709727
/// returns false it will always return that the stream is done.
710728
fn take_while<P, R>(self, pred: P) -> TakeWhile<Self, P, R>
711-
where P: FnMut(&Self::Item) -> R,
712-
R: IntoFuture<Item=bool, Error=Self::Error>,
713-
Self: Sized
729+
where
730+
P: FnMut(&Self::Item) -> R,
731+
R: IntoFuture<Item = bool, Error = Self::Error>,
732+
Self: Sized,
714733
{
715734
take_while::new(self, pred)
716735
}
@@ -727,9 +746,10 @@ pub trait Stream {
727746
/// closure will cause iteration to be halted immediately and the future
728747
/// will resolve to that error.
729748
fn for_each<F, U>(self, f: F) -> ForEach<Self, F, U>
730-
where F: FnMut(Self::Item) -> U,
731-
U: IntoFuture<Item=(), Error = Self::Error>,
732-
Self: Sized
749+
where
750+
F: FnMut(Self::Item) -> U,
751+
U: IntoFuture<Item = (), Error = Self::Error>,
752+
Self: Sized,
733753
{
734754
for_each::new(self, f)
735755
}
@@ -746,7 +766,8 @@ pub trait Stream {
746766
/// Note that this function consumes the receiving stream and returns a
747767
/// wrapped version of it.
748768
fn from_err<E: From<Self::Error>>(self) -> FromErr<Self, E>
749-
where Self: Sized,
769+
where
770+
Self: Sized,
750771
{
751772
from_err::new(self)
752773
}
@@ -762,7 +783,8 @@ pub trait Stream {
762783
/// items is reached, are passed through and do not affect the total number
763784
/// of items taken.
764785
fn take(self, amt: u64) -> Take<Self>
765-
where Self: Sized
786+
where
787+
Self: Sized,
766788
{
767789
take::new(self, amt)
768790
}
@@ -777,7 +799,8 @@ pub trait Stream {
777799
/// All errors yielded from underlying stream are passed through and do not
778800
/// affect the total number of items skipped.
779801
fn skip(self, amt: u64) -> Skip<Self>
780-
where Self: Sized
802+
where
803+
Self: Sized,
781804
{
782805
skip::new(self, amt)
783806
}
@@ -798,7 +821,8 @@ pub trait Stream {
798821
/// Also note that as soon as this stream returns `None` it will be dropped
799822
/// to reclaim resources associated with it.
800823
fn fuse(self) -> Fuse<Self>
801-
where Self: Sized
824+
where
825+
Self: Sized,
802826
{
803827
fuse::new(self)
804828
}
@@ -822,7 +846,8 @@ pub trait Stream {
822846
/// assert_eq!(sum, Ok(7));
823847
/// ```
824848
fn by_ref(&mut self) -> &mut Self
825-
where Self: Sized
849+
where
850+
Self: Sized,
826851
{
827852
self
828853
}
@@ -863,7 +888,8 @@ pub trait Stream {
863888
/// ```
864889
#[cfg(feature = "use_std")]
865890
fn catch_unwind(self) -> CatchUnwind<Self>
866-
where Self: Sized + std::panic::UnwindSafe
891+
where
892+
Self: Sized + std::panic::UnwindSafe,
867893
{
868894
catch_unwind::new(self)
869895
}
@@ -883,8 +909,9 @@ pub trait Stream {
883909
/// library is activated, and it is activated by default.
884910
#[cfg(feature = "use_std")]
885911
fn buffered(self, amt: usize) -> Buffered<Self>
886-
where Self::Item: IntoFuture<Error = <Self as Stream>::Error>,
887-
Self: Sized
912+
where
913+
Self::Item: IntoFuture<Error = <Self as Stream>::Error>,
914+
Self: Sized,
888915
{
889916
buffered::new(self, amt)
890917
}
@@ -904,8 +931,9 @@ pub trait Stream {
904931
/// library is activated, and it is activated by default.
905932
#[cfg(feature = "use_std")]
906933
fn buffer_unordered(self, amt: usize) -> BufferUnordered<Self>
907-
where Self::Item: IntoFuture<Error = <Self as Stream>::Error>,
908-
Self: Sized
934+
where
935+
Self::Item: IntoFuture<Error = <Self as Stream>::Error>,
936+
Self: Sized,
909937
{
910938
buffer_unordered::new(self, amt)
911939
}
@@ -916,8 +944,9 @@ pub trait Stream {
916944
/// streams as they become available. Errors, however, are not merged: you
917945
/// get at most one error at a time.
918946
fn merge<S>(self, other: S) -> Merge<Self, S>
919-
where S: Stream<Error = Self::Error>,
920-
Self: Sized,
947+
where
948+
S: Stream<Error = Self::Error>,
949+
Self: Sized,
921950
{
922951
merge::new(self, other)
923952
}
@@ -928,8 +957,9 @@ pub trait Stream {
928957
/// returns that pair. If an error happens, then that error will be returned
929958
/// immediately. If either stream ends then the zipped stream will also end.
930959
fn zip<S>(self, other: S) -> Zip<Self, S>
931-
where S: Stream<Error = Self::Error>,
932-
Self: Sized,
960+
where
961+
S: Stream<Error = Self::Error>,
962+
Self: Sized,
933963
{
934964
zip::new(self, other)
935965
}
@@ -954,8 +984,9 @@ pub trait Stream {
954984
/// assert_eq!(None, chain.next());
955985
/// ```
956986
fn chain<S>(self, other: S) -> Chain<Self, S>
957-
where S: Stream<Item = Self::Item, Error = Self::Error>,
958-
Self: Sized
987+
where
988+
S: Stream<Item = Self::Item, Error = Self::Error>,
989+
Self: Sized,
959990
{
960991
chain::new(self, other)
961992
}
@@ -964,7 +995,8 @@ pub trait Stream {
964995
///
965996
/// Calling `peek` returns a reference to the next item in the stream.
966997
fn peekable(self) -> Peekable<Self>
967-
where Self: Sized
998+
where
999+
Self: Sized,
9681000
{
9691001
peek::new(self)
9701002
}
@@ -991,7 +1023,8 @@ pub trait Stream {
9911023
/// This method will panic of `capacity` is zero.
9921024
#[cfg(feature = "use_std")]
9931025
fn chunks(self, capacity: usize) -> Chunks<Self>
994-
where Self: Sized
1026+
where
1027+
Self: Sized,
9951028
{
9961029
chunks::new(self, capacity)
9971030
}
@@ -1008,8 +1041,9 @@ pub trait Stream {
10081041
///
10091042
/// Error are passed through from either stream.
10101043
fn select<S>(self, other: S) -> Select<Self, S>
1011-
where S: Stream<Item = Self::Item, Error = Self::Error>,
1012-
Self: Sized,
1044+
where
1045+
S: Stream<Item = Self::Item, Error = Self::Error>,
1046+
Self: Sized,
10131047
{
10141048
select::new(self, other)
10151049
}
@@ -1029,9 +1063,10 @@ pub trait Stream {
10291063
///
10301064
/// On completion, the pair `(stream, sink)` is returned.
10311065
fn forward<S>(self, sink: S) -> Forward<Self, S>
1032-
where S: Sink<SinkItem = Self::Item>,
1033-
Self::Error: From<S::SinkError>,
1034-
Self: Sized
1066+
where
1067+
S: Sink<SinkItem = Self::Item>,
1068+
Self::Error: From<S::SinkError>,
1069+
Self: Sized,
10351070
{
10361071
forward::new(self, sink)
10371072
}
@@ -1047,7 +1082,8 @@ pub trait Stream {
10471082
/// library is activated, and it is activated by default.
10481083
#[cfg(feature = "use_std")]
10491084
fn split(self) -> (SplitSink<Self>, SplitStream<Self>)
1050-
where Self: super::sink::Sink + Sized
1085+
where
1086+
Self: super::sink::Sink + Sized,
10511087
{
10521088
split::split(self)
10531089
}

‎src/stream/poll_fn.rs

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
//! Definition of the `PollFn` combinator
2+
3+
use {Stream, Poll};
4+
5+
/// A stream which adapts a function returning `Poll`.
6+
///
7+
/// Created by the `poll_fn` function.
8+
#[derive(Debug)]
9+
#[must_use = "streams do nothing unless polled"]
10+
pub struct PollFn<F> {
11+
inner: F,
12+
}
13+
14+
/// Creates a new stream wrapping around a function returning `Poll`.
15+
///
16+
/// Polling the returned stream delegates to the wrapped function.
17+
///
18+
/// # Examples
19+
///
20+
/// ```
21+
/// use futures::stream::poll_fn;
22+
/// use futures::{Async, Poll};
23+
///
24+
/// let mut counter = 1usize;
25+
///
26+
/// let read_stream = poll_fn(move || -> Poll<String, std::error::Error> {
27+
/// if counter == 0 { return Ok(Async::Ready(None)); }
28+
/// counter -= 1;
29+
/// Ok(Async::Ready(Some("Hello, World!".to_owned())))
30+
/// });
31+
/// ```
32+
pub fn poll_fn<T, E, F>(f: F) -> PollFn<F>
33+
where
34+
F: FnMut() -> Poll<T, E>,
35+
{
36+
PollFn { inner: f }
37+
}
38+
39+
impl<T, E, F> Stream for PollFn<F>
40+
where
41+
F: FnMut() -> Poll<Option<T>, E>,
42+
{
43+
type Item = T;
44+
type Error = E;
45+
46+
fn poll(&mut self) -> Poll<Option<T>, E> {
47+
(self.inner)()
48+
}
49+
}

0 commit comments

Comments
 (0)
Please sign in to comment.