futures_util/stream/try_stream/
mod.rs

1//! Streams
2//!
3//! This module contains a number of functions for working with `Streams`s
4//! that return `Result`s, allowing for short-circuiting computations.
5
6#[cfg(feature = "compat")]
7use crate::compat::Compat;
8use crate::fns::{
9    inspect_err_fn, inspect_ok_fn, into_fn, map_err_fn, map_ok_fn, InspectErrFn, InspectOkFn,
10    IntoFn, MapErrFn, MapOkFn,
11};
12use crate::future::assert_future;
13use crate::stream::assert_stream;
14use crate::stream::{Inspect, Map};
15#[cfg(feature = "alloc")]
16use alloc::vec::Vec;
17use core::pin::Pin;
18use futures_core::{
19    future::{Future, TryFuture},
20    stream::TryStream,
21    task::{Context, Poll},
22};
23
24mod and_then;
25#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
26pub use self::and_then::AndThen;
27
28delegate_all!(
29    /// Stream for the [`err_into`](super::TryStreamExt::err_into) method.
30    ErrInto<St, E>(
31        MapErr<St, IntoFn<E>>
32    ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| MapErr::new(x, into_fn())]
33);
34
35delegate_all!(
36    /// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method.
37    InspectOk<St, F>(
38        Inspect<IntoStream<St>, InspectOkFn<F>>
39    ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_ok_fn(f))]
40);
41
42delegate_all!(
43    /// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method.
44    InspectErr<St, F>(
45        Inspect<IntoStream<St>, InspectErrFn<F>>
46    ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_err_fn(f))]
47);
48
49mod into_stream;
50#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
51pub use self::into_stream::IntoStream;
52
53delegate_all!(
54    /// Stream for the [`map_ok`](super::TryStreamExt::map_ok) method.
55    MapOk<St, F>(
56        Map<IntoStream<St>, MapOkFn<F>>
57    ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_ok_fn(f))]
58);
59
60delegate_all!(
61    /// Stream for the [`map_err`](super::TryStreamExt::map_err) method.
62    MapErr<St, F>(
63        Map<IntoStream<St>, MapErrFn<F>>
64    ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_err_fn(f))]
65);
66
67mod or_else;
68#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
69pub use self::or_else::OrElse;
70
71mod try_next;
72#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
73pub use self::try_next::TryNext;
74
75mod try_for_each;
76#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
77pub use self::try_for_each::TryForEach;
78
79mod try_filter;
80#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
81pub use self::try_filter::TryFilter;
82
83mod try_filter_map;
84#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
85pub use self::try_filter_map::TryFilterMap;
86
87mod try_flatten;
88#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
89pub use self::try_flatten::TryFlatten;
90
91mod try_collect;
92#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
93pub use self::try_collect::TryCollect;
94
95mod try_concat;
96#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
97pub use self::try_concat::TryConcat;
98
99#[cfg(feature = "alloc")]
100mod try_chunks;
101#[cfg(feature = "alloc")]
102#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
103pub use self::try_chunks::{TryChunks, TryChunksError};
104
105mod try_fold;
106#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
107pub use self::try_fold::TryFold;
108
109mod try_unfold;
110#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
111pub use self::try_unfold::{try_unfold, TryUnfold};
112
113mod try_skip_while;
114#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
115pub use self::try_skip_while::TrySkipWhile;
116
117mod try_take_while;
118#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
119pub use self::try_take_while::TryTakeWhile;
120
121#[cfg(not(futures_no_atomic_cas))]
122#[cfg(feature = "alloc")]
123mod try_buffer_unordered;
124#[cfg(not(futures_no_atomic_cas))]
125#[cfg(feature = "alloc")]
126#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
127pub use self::try_buffer_unordered::TryBufferUnordered;
128
129#[cfg(not(futures_no_atomic_cas))]
130#[cfg(feature = "alloc")]
131mod try_buffered;
132#[cfg(not(futures_no_atomic_cas))]
133#[cfg(feature = "alloc")]
134#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
135pub use self::try_buffered::TryBuffered;
136
137#[cfg(not(futures_no_atomic_cas))]
138#[cfg(feature = "alloc")]
139mod try_for_each_concurrent;
140#[cfg(not(futures_no_atomic_cas))]
141#[cfg(feature = "alloc")]
142#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
143pub use self::try_for_each_concurrent::TryForEachConcurrent;
144
145#[cfg(feature = "io")]
146#[cfg(feature = "std")]
147mod into_async_read;
148#[cfg(feature = "io")]
149#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
150#[cfg(feature = "std")]
151#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
152pub use self::into_async_read::IntoAsyncRead;
153
154impl<S: ?Sized + TryStream> TryStreamExt for S {}
155
156/// Adapters specific to `Result`-returning streams
157pub trait TryStreamExt: TryStream {
158    /// Wraps the current stream in a new stream which converts the error type
159    /// into the one provided.
160    ///
161    /// # Examples
162    ///
163    /// ```
164    /// # futures::executor::block_on(async {
165    /// use futures::stream::{self, TryStreamExt};
166    ///
167    /// let mut stream =
168    ///     stream::iter(vec![Ok(()), Err(5i32)])
169    ///         .err_into::<i64>();
170    ///
171    /// assert_eq!(stream.try_next().await, Ok(Some(())));
172    /// assert_eq!(stream.try_next().await, Err(5i64));
173    /// # })
174    /// ```
175    fn err_into<E>(self) -> ErrInto<Self, E>
176    where
177        Self: Sized,
178        Self::Error: Into<E>,
179    {
180        assert_stream::<Result<Self::Ok, E>, _>(ErrInto::new(self))
181    }
182
183    /// Wraps the current stream in a new stream which maps the success value
184    /// using the provided closure.
185    ///
186    /// # Examples
187    ///
188    /// ```
189    /// # futures::executor::block_on(async {
190    /// use futures::stream::{self, TryStreamExt};
191    ///
192    /// let mut stream =
193    ///     stream::iter(vec![Ok(5), Err(0)])
194    ///         .map_ok(|x| x + 2);
195    ///
196    /// assert_eq!(stream.try_next().await, Ok(Some(7)));
197    /// assert_eq!(stream.try_next().await, Err(0));
198    /// # })
199    /// ```
200    fn map_ok<T, F>(self, f: F) -> MapOk<Self, F>
201    where
202        Self: Sized,
203        F: FnMut(Self::Ok) -> T,
204    {
205        assert_stream::<Result<T, Self::Error>, _>(MapOk::new(self, f))
206    }
207
208    /// Wraps the current stream in a new stream which maps the error value
209    /// using the provided closure.
210    ///
211    /// # Examples
212    ///
213    /// ```
214    /// # futures::executor::block_on(async {
215    /// use futures::stream::{self, TryStreamExt};
216    ///
217    /// let mut stream =
218    ///     stream::iter(vec![Ok(5), Err(0)])
219    ///         .map_err(|x| x + 2);
220    ///
221    /// assert_eq!(stream.try_next().await, Ok(Some(5)));
222    /// assert_eq!(stream.try_next().await, Err(2));
223    /// # })
224    /// ```
225    fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
226    where
227        Self: Sized,
228        F: FnMut(Self::Error) -> E,
229    {
230        assert_stream::<Result<Self::Ok, E>, _>(MapErr::new(self, f))
231    }
232
233    /// Chain on a computation for when a value is ready, passing the successful
234    /// results to the provided closure `f`.
235    ///
236    /// This function can be used to run a unit of work when the next successful
237    /// value on a stream is ready. The closure provided will be yielded a value
238    /// when ready, and the returned future will then be run to completion to
239    /// produce the next value on this stream.
240    ///
241    /// Any errors produced by this stream will not be passed to the closure,
242    /// and will be passed through.
243    ///
244    /// The returned value of the closure must implement the `TryFuture` trait
245    /// and can represent some more work to be done before the composed stream
246    /// is finished.
247    ///
248    /// Note that this function consumes the receiving stream and returns a
249    /// wrapped version of it.
250    ///
251    /// To process the entire stream and return a single future representing
252    /// success or error, use `try_for_each` instead.
253    ///
254    /// # Examples
255    ///
256    /// ```
257    /// use futures::channel::mpsc;
258    /// use futures::future;
259    /// use futures::stream::TryStreamExt;
260    ///
261    /// let (_tx, rx) = mpsc::channel::<Result<i32, ()>>(1);
262    ///
263    /// let rx = rx.and_then(|result| {
264    ///     future::ok(if result % 2 == 0 {
265    ///         Some(result)
266    ///     } else {
267    ///         None
268    ///     })
269    /// });
270    /// ```
271    fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F>
272    where
273        F: FnMut(Self::Ok) -> Fut,
274        Fut: TryFuture<Error = Self::Error>,
275        Self: Sized,
276    {
277        assert_stream::<Result<Fut::Ok, Fut::Error>, _>(AndThen::new(self, f))
278    }
279
280    /// Chain on a computation for when an error happens, passing the
281    /// erroneous result to the provided closure `f`.
282    ///
283    /// This function can be used to run a unit of work and attempt to recover from
284    /// an error if one happens. The closure provided will be yielded an error
285    /// when one appears, and the returned future will then be run to completion
286    /// to produce the next value on this stream.
287    ///
288    /// Any successful values produced by this stream will not be passed to the
289    /// closure, and will be passed through.
290    ///
291    /// The returned value of the closure must implement the [`TryFuture`](futures_core::future::TryFuture) trait
292    /// and can represent some more work to be done before the composed stream
293    /// is finished.
294    ///
295    /// Note that this function consumes the receiving stream and returns a
296    /// wrapped version of it.
297    fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F>
298    where
299        F: FnMut(Self::Error) -> Fut,
300        Fut: TryFuture<Ok = Self::Ok>,
301        Self: Sized,
302    {
303        assert_stream::<Result<Self::Ok, Fut::Error>, _>(OrElse::new(self, f))
304    }
305
306    /// Do something with the success value of this stream, afterwards passing
307    /// it on.
308    ///
309    /// This is similar to the `StreamExt::inspect` method where it allows
310    /// easily inspecting the success value as it passes through the stream, for
311    /// example to debug what's going on.
312    fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F>
313    where
314        F: FnMut(&Self::Ok),
315        Self: Sized,
316    {
317        assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectOk::new(self, f))
318    }
319
320    /// Do something with the error value of this stream, afterwards passing it on.
321    ///
322    /// This is similar to the `StreamExt::inspect` method where it allows
323    /// easily inspecting the error value as it passes through the stream, for
324    /// example to debug what's going on.
325    fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
326    where
327        F: FnMut(&Self::Error),
328        Self: Sized,
329    {
330        assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectErr::new(self, f))
331    }
332
333    /// Wraps a [`TryStream`] into a type that implements
334    /// [`Stream`](futures_core::stream::Stream)
335    ///
336    /// [`TryStream`]s currently do not implement the
337    /// [`Stream`](futures_core::stream::Stream) trait because of limitations
338    /// of the compiler.
339    ///
340    /// # Examples
341    ///
342    /// ```
343    /// use futures::stream::{Stream, TryStream, TryStreamExt};
344    ///
345    /// # type T = i32;
346    /// # type E = ();
347    /// fn make_try_stream() -> impl TryStream<Ok = T, Error = E> { // ... }
348    /// # futures::stream::empty()
349    /// # }
350    /// fn take_stream(stream: impl Stream<Item = Result<T, E>>) { /* ... */ }
351    ///
352    /// take_stream(make_try_stream().into_stream());
353    /// ```
354    fn into_stream(self) -> IntoStream<Self>
355    where
356        Self: Sized,
357    {
358        assert_stream::<Result<Self::Ok, Self::Error>, _>(IntoStream::new(self))
359    }
360
361    /// Creates a future that attempts to resolve the next item in the stream.
362    /// If an error is encountered before the next item, the error is returned
363    /// instead.
364    ///
365    /// This is similar to the `Stream::next` combinator, but returns a
366    /// `Result<Option<T>, E>` rather than an `Option<Result<T, E>>`, making
367    /// for easy use with the `?` operator.
368    ///
369    /// # Examples
370    ///
371    /// ```
372    /// # futures::executor::block_on(async {
373    /// use futures::stream::{self, TryStreamExt};
374    ///
375    /// let mut stream = stream::iter(vec![Ok(()), Err(())]);
376    ///
377    /// assert_eq!(stream.try_next().await, Ok(Some(())));
378    /// assert_eq!(stream.try_next().await, Err(()));
379    /// # })
380    /// ```
381    fn try_next(&mut self) -> TryNext<'_, Self>
382    where
383        Self: Unpin,
384    {
385        assert_future::<Result<Option<Self::Ok>, Self::Error>, _>(TryNext::new(self))
386    }
387
388    /// Attempts to run this stream to completion, executing the provided
389    /// asynchronous closure for each element on the stream.
390    ///
391    /// The provided closure will be called for each item this stream produces,
392    /// yielding a future. That future will then be executed to completion
393    /// before moving on to the next item.
394    ///
395    /// The returned value is a [`Future`](futures_core::future::Future) where the
396    /// [`Output`](futures_core::future::Future::Output) type is
397    /// `Result<(), Self::Error>`. If any of the intermediate
398    /// futures or the stream returns an error, this future will return
399    /// immediately with an error.
400    ///
401    /// # Examples
402    ///
403    /// ```
404    /// # futures::executor::block_on(async {
405    /// use futures::future;
406    /// use futures::stream::{self, TryStreamExt};
407    ///
408    /// let mut x = 0i32;
409    ///
410    /// {
411    ///     let fut = stream::repeat(Ok(1)).try_for_each(|item| {
412    ///         x += item;
413    ///         future::ready(if x == 3 { Err(()) } else { Ok(()) })
414    ///     });
415    ///     assert_eq!(fut.await, Err(()));
416    /// }
417    ///
418    /// assert_eq!(x, 3);
419    /// # })
420    /// ```
421    fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F>
422    where
423        F: FnMut(Self::Ok) -> Fut,
424        Fut: TryFuture<Ok = (), Error = Self::Error>,
425        Self: Sized,
426    {
427        assert_future::<Result<(), Self::Error>, _>(TryForEach::new(self, f))
428    }
429
430    /// Skip elements on this stream while the provided asynchronous predicate
431    /// resolves to `true`.
432    ///
433    /// This function is similar to
434    /// [`StreamExt::skip_while`](crate::stream::StreamExt::skip_while) but exits
435    /// early if an error occurs.
436    ///
437    /// # Examples
438    ///
439    /// ```
440    /// # futures::executor::block_on(async {
441    /// use futures::future;
442    /// use futures::stream::{self, TryStreamExt};
443    ///
444    /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(3), Ok(2)]);
445    /// let stream = stream.try_skip_while(|x| future::ready(Ok(*x < 3)));
446    ///
447    /// let output: Result<Vec<i32>, i32> = stream.try_collect().await;
448    /// assert_eq!(output, Ok(vec![3, 2]));
449    /// # })
450    /// ```
451    fn try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F>
452    where
453        F: FnMut(&Self::Ok) -> Fut,
454        Fut: TryFuture<Ok = bool, Error = Self::Error>,
455        Self: Sized,
456    {
457        assert_stream::<Result<Self::Ok, Self::Error>, _>(TrySkipWhile::new(self, f))
458    }
459
460    /// Take elements on this stream while the provided asynchronous predicate
461    /// resolves to `true`.
462    ///
463    /// This function is similar to
464    /// [`StreamExt::take_while`](crate::stream::StreamExt::take_while) but exits
465    /// early if an error occurs.
466    ///
467    /// # Examples
468    ///
469    /// ```
470    /// # futures::executor::block_on(async {
471    /// use futures::future;
472    /// use futures::stream::{self, TryStreamExt};
473    ///
474    /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Ok(2)]);
475    /// let stream = stream.try_take_while(|x| future::ready(Ok(*x < 3)));
476    ///
477    /// let output: Result<Vec<i32>, i32> = stream.try_collect().await;
478    /// assert_eq!(output, Ok(vec![1, 2]));
479    /// # })
480    /// ```
481    fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F>
482    where
483        F: FnMut(&Self::Ok) -> Fut,
484        Fut: TryFuture<Ok = bool, Error = Self::Error>,
485        Self: Sized,
486    {
487        assert_stream::<Result<Self::Ok, Self::Error>, _>(TryTakeWhile::new(self, f))
488    }
489
490    /// Attempts to run this stream to completion, executing the provided asynchronous
491    /// closure for each element on the stream concurrently as elements become
492    /// available, exiting as soon as an error occurs.
493    ///
494    /// This is similar to
495    /// [`StreamExt::for_each_concurrent`](crate::stream::StreamExt::for_each_concurrent),
496    /// but will resolve to an error immediately if the underlying stream or the provided
497    /// closure return an error.
498    ///
499    /// This method is only available when the `std` or `alloc` feature of this
500    /// library is activated, and it is activated by default.
501    ///
502    /// # Examples
503    ///
504    /// ```
505    /// # futures::executor::block_on(async {
506    /// use futures::channel::oneshot;
507    /// use futures::stream::{self, StreamExt, TryStreamExt};
508    ///
509    /// let (tx1, rx1) = oneshot::channel();
510    /// let (tx2, rx2) = oneshot::channel();
511    /// let (_tx3, rx3) = oneshot::channel();
512    ///
513    /// let stream = stream::iter(vec![rx1, rx2, rx3]);
514    /// let fut = stream.map(Ok).try_for_each_concurrent(
515    ///     /* limit */ 2,
516    ///     |rx| async move {
517    ///         let res: Result<(), oneshot::Canceled> = rx.await;
518    ///         res
519    ///     }
520    /// );
521    ///
522    /// tx1.send(()).unwrap();
523    /// // Drop the second sender so that `rx2` resolves to `Canceled`.
524    /// drop(tx2);
525    ///
526    /// // The final result is an error because the second future
527    /// // resulted in an error.
528    /// assert_eq!(Err(oneshot::Canceled), fut.await);
529    /// # })
530    /// ```
531    #[cfg(not(futures_no_atomic_cas))]
532    #[cfg(feature = "alloc")]
533    fn try_for_each_concurrent<Fut, F>(
534        self,
535        limit: impl Into<Option<usize>>,
536        f: F,
537    ) -> TryForEachConcurrent<Self, Fut, F>
538    where
539        F: FnMut(Self::Ok) -> Fut,
540        Fut: Future<Output = Result<(), Self::Error>>,
541        Self: Sized,
542    {
543        assert_future::<Result<(), Self::Error>, _>(TryForEachConcurrent::new(
544            self,
545            limit.into(),
546            f,
547        ))
548    }
549
550    /// Attempt to transform a stream into a collection,
551    /// returning a future representing the result of that computation.
552    ///
553    /// This combinator will collect all successful results of this stream and
554    /// collect them into the specified collection type. If an error happens then all
555    /// collected elements will be dropped and the error will be returned.
556    ///
557    /// The returned future will be resolved when the stream terminates.
558    ///
559    /// # Examples
560    ///
561    /// ```
562    /// # futures::executor::block_on(async {
563    /// use futures::channel::mpsc;
564    /// use futures::stream::TryStreamExt;
565    /// use std::thread;
566    ///
567    /// let (tx, rx) = mpsc::unbounded();
568    ///
569    /// thread::spawn(move || {
570    ///     for i in 1..=5 {
571    ///         tx.unbounded_send(Ok(i)).unwrap();
572    ///     }
573    ///     tx.unbounded_send(Err(6)).unwrap();
574    /// });
575    ///
576    /// let output: Result<Vec<i32>, i32> = rx.try_collect().await;
577    /// assert_eq!(output, Err(6));
578    /// # })
579    /// ```
580    fn try_collect<C: Default + Extend<Self::Ok>>(self) -> TryCollect<Self, C>
581    where
582        Self: Sized,
583    {
584        assert_future::<Result<C, Self::Error>, _>(TryCollect::new(self))
585    }
586
587    /// An adaptor for chunking up successful items of the stream inside a vector.
588    ///
589    /// This combinator will attempt to pull successful items from this stream and buffer
590    /// them into a local vector. At most `capacity` items will get buffered
591    /// before they're yielded from the returned stream.
592    ///
593    /// Note that the vectors returned from this iterator may not always have
594    /// `capacity` elements. If the underlying stream ended and only a partial
595    /// vector was created, it'll be returned. Additionally if an error happens
596    /// from the underlying stream then the currently buffered items will be
597    /// yielded.
598    ///
599    /// This method is only available when the `std` or `alloc` feature of this
600    /// library is activated, and it is activated by default.
601    ///
602    /// This function is similar to
603    /// [`StreamExt::chunks`](crate::stream::StreamExt::chunks) but exits
604    /// early if an error occurs.
605    ///
606    /// # Examples
607    ///
608    /// ```
609    /// # futures::executor::block_on(async {
610    /// use futures::stream::{self, TryChunksError, TryStreamExt};
611    ///
612    /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]);
613    /// let mut stream = stream.try_chunks(2);
614    ///
615    /// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2])));
616    /// assert_eq!(stream.try_next().await, Err(TryChunksError(vec![3], 4)));
617    /// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6])));
618    /// # })
619    /// ```
620    ///
621    /// # Panics
622    ///
623    /// This method will panic if `capacity` is zero.
624    #[cfg(feature = "alloc")]
625    fn try_chunks(self, capacity: usize) -> TryChunks<Self>
626    where
627        Self: Sized,
628    {
629        assert_stream::<Result<Vec<Self::Ok>, TryChunksError<Self::Ok, Self::Error>>, _>(
630            TryChunks::new(self, capacity),
631        )
632    }
633
634    /// Attempt to filter the values produced by this stream according to the
635    /// provided asynchronous closure.
636    ///
637    /// As values of this stream are made available, the provided predicate `f`
638    /// will be run on them. If the predicate returns a `Future` which resolves
639    /// to `true`, then the stream will yield the value, but if the predicate
640    /// return a `Future` which resolves to `false`, then the value will be
641    /// discarded and the next value will be produced.
642    ///
643    /// All errors are passed through without filtering in this combinator.
644    ///
645    /// Note that this function consumes the stream passed into it and returns a
646    /// wrapped version of it, similar to the existing `filter` methods in
647    /// the standard library.
648    ///
649    /// # Examples
650    /// ```
651    /// # futures::executor::block_on(async {
652    /// use futures::future;
653    /// use futures::stream::{self, StreamExt, TryStreamExt};
654    ///
655    /// let stream = stream::iter(vec![Ok(1i32), Ok(2i32), Ok(3i32), Err("error")]);
656    /// let mut evens = stream.try_filter(|x| {
657    ///     future::ready(x % 2 == 0)
658    /// });
659    ///
660    /// assert_eq!(evens.next().await, Some(Ok(2)));
661    /// assert_eq!(evens.next().await, Some(Err("error")));
662    /// # })
663    /// ```
664    fn try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F>
665    where
666        Fut: Future<Output = bool>,
667        F: FnMut(&Self::Ok) -> Fut,
668        Self: Sized,
669    {
670        assert_stream::<Result<Self::Ok, Self::Error>, _>(TryFilter::new(self, f))
671    }
672
673    /// Attempt to filter the values produced by this stream while
674    /// simultaneously mapping them to a different type according to the
675    /// provided asynchronous closure.
676    ///
677    /// As values of this stream are made available, the provided function will
678    /// be run on them. If the future returned by the predicate `f` resolves to
679    /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
680    /// it resolves to [`None`] then the next value will be produced.
681    ///
682    /// All errors are passed through without filtering in this combinator.
683    ///
684    /// Note that this function consumes the stream passed into it and returns a
685    /// wrapped version of it, similar to the existing `filter_map` methods in
686    /// the standard library.
687    ///
688    /// # Examples
689    /// ```
690    /// # futures::executor::block_on(async {
691    /// use futures::stream::{self, StreamExt, TryStreamExt};
692    /// use futures::pin_mut;
693    ///
694    /// let stream = stream::iter(vec![Ok(1i32), Ok(6i32), Err("error")]);
695    /// let halves = stream.try_filter_map(|x| async move {
696    ///     let ret = if x % 2 == 0 { Some(x / 2) } else { None };
697    ///     Ok(ret)
698    /// });
699    ///
700    /// pin_mut!(halves);
701    /// assert_eq!(halves.next().await, Some(Ok(3)));
702    /// assert_eq!(halves.next().await, Some(Err("error")));
703    /// # })
704    /// ```
705    fn try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F>
706    where
707        Fut: TryFuture<Ok = Option<T>, Error = Self::Error>,
708        F: FnMut(Self::Ok) -> Fut,
709        Self: Sized,
710    {
711        assert_stream::<Result<T, Self::Error>, _>(TryFilterMap::new(self, f))
712    }
713
714    /// Flattens a stream of streams into just one continuous stream.
715    ///
716    /// If this stream's elements are themselves streams then this combinator
717    /// will flatten out the entire stream to one long chain of elements. Any
718    /// errors are passed through without looking at them, but otherwise each
719    /// individual stream will get exhausted before moving on to the next.
720    ///
721    /// # Examples
722    ///
723    /// ```
724    /// # futures::executor::block_on(async {
725    /// use futures::channel::mpsc;
726    /// use futures::stream::{StreamExt, TryStreamExt};
727    /// use std::thread;
728    ///
729    /// let (tx1, rx1) = mpsc::unbounded();
730    /// let (tx2, rx2) = mpsc::unbounded();
731    /// let (tx3, rx3) = mpsc::unbounded();
732    ///
733    /// thread::spawn(move || {
734    ///     tx1.unbounded_send(Ok(1)).unwrap();
735    /// });
736    /// thread::spawn(move || {
737    ///     tx2.unbounded_send(Ok(2)).unwrap();
738    ///     tx2.unbounded_send(Err(3)).unwrap();
739    ///     tx2.unbounded_send(Ok(4)).unwrap();
740    /// });
741    /// thread::spawn(move || {
742    ///     tx3.unbounded_send(Ok(rx1)).unwrap();
743    ///     tx3.unbounded_send(Ok(rx2)).unwrap();
744    ///     tx3.unbounded_send(Err(5)).unwrap();
745    /// });
746    ///
747    /// let mut stream = rx3.try_flatten();
748    /// assert_eq!(stream.next().await, Some(Ok(1)));
749    /// assert_eq!(stream.next().await, Some(Ok(2)));
750    /// assert_eq!(stream.next().await, Some(Err(3)));
751    /// assert_eq!(stream.next().await, Some(Ok(4)));
752    /// assert_eq!(stream.next().await, Some(Err(5)));
753    /// assert_eq!(stream.next().await, None);
754    /// # });
755    /// ```
756    fn try_flatten(self) -> TryFlatten<Self>
757    where
758        Self::Ok: TryStream,
759        <Self::Ok as TryStream>::Error: From<Self::Error>,
760        Self: Sized,
761    {
762        assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>(
763            TryFlatten::new(self),
764        )
765    }
766
767    /// Attempt to execute an accumulating asynchronous computation over a
768    /// stream, collecting all the values into one final result.
769    ///
770    /// This combinator will accumulate all values returned by this stream
771    /// according to the closure provided. The initial state is also provided to
772    /// this method and then is returned again by each execution of the closure.
773    /// Once the entire stream has been exhausted the returned future will
774    /// resolve to this value.
775    ///
776    /// This method is similar to [`fold`](crate::stream::StreamExt::fold), but will
777    /// exit early if an error is encountered in either the stream or the
778    /// provided closure.
779    ///
780    /// # Examples
781    ///
782    /// ```
783    /// # futures::executor::block_on(async {
784    /// use futures::stream::{self, TryStreamExt};
785    ///
786    /// let number_stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2)]);
787    /// let sum = number_stream.try_fold(0, |acc, x| async move { Ok(acc + x) });
788    /// assert_eq!(sum.await, Ok(3));
789    ///
790    /// let number_stream_with_err = stream::iter(vec![Ok::<i32, i32>(1), Err(2), Ok(1)]);
791    /// let sum = number_stream_with_err.try_fold(0, |acc, x| async move { Ok(acc + x) });
792    /// assert_eq!(sum.await, Err(2));
793    /// # })
794    /// ```
795    fn try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F>
796    where
797        F: FnMut(T, Self::Ok) -> Fut,
798        Fut: TryFuture<Ok = T, Error = Self::Error>,
799        Self: Sized,
800    {
801        assert_future::<Result<T, Self::Error>, _>(TryFold::new(self, f, init))
802    }
803
804    /// Attempt to concatenate all items of a stream into a single
805    /// extendable destination, returning a future representing the end result.
806    ///
807    /// This combinator will extend the first item with the contents of all
808    /// the subsequent successful results of the stream. If the stream is empty,
809    /// the default value will be returned.
810    ///
811    /// Works with all collections that implement the [`Extend`](std::iter::Extend) trait.
812    ///
813    /// This method is similar to [`concat`](crate::stream::StreamExt::concat), but will
814    /// exit early if an error is encountered in the stream.
815    ///
816    /// # Examples
817    ///
818    /// ```
819    /// # futures::executor::block_on(async {
820    /// use futures::channel::mpsc;
821    /// use futures::stream::TryStreamExt;
822    /// use std::thread;
823    ///
824    /// let (tx, rx) = mpsc::unbounded::<Result<Vec<i32>, ()>>();
825    ///
826    /// thread::spawn(move || {
827    ///     for i in (0..3).rev() {
828    ///         let n = i * 3;
829    ///         tx.unbounded_send(Ok(vec![n + 1, n + 2, n + 3])).unwrap();
830    ///     }
831    /// });
832    ///
833    /// let result = rx.try_concat().await;
834    ///
835    /// assert_eq!(result, Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3]));
836    /// # });
837    /// ```
838    fn try_concat(self) -> TryConcat<Self>
839    where
840        Self: Sized,
841        Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default,
842    {
843        assert_future::<Result<Self::Ok, Self::Error>, _>(TryConcat::new(self))
844    }
845
846    /// Attempt to execute several futures from a stream concurrently (unordered).
847    ///
848    /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
849    /// that matches the stream's `Error` type.
850    ///
851    /// This adaptor will buffer up to `n` futures and then return their
852    /// outputs in the order in which they complete. If the underlying stream
853    /// returns an error, it will be immediately propagated.
854    ///
855    /// The returned stream will be a stream of results, each containing either
856    /// an error or a future's output. An error can be produced either by the
857    /// underlying stream itself or by one of the futures it yielded.
858    ///
859    /// This method is only available when the `std` or `alloc` feature of this
860    /// library is activated, and it is activated by default.
861    ///
862    /// # Examples
863    ///
864    /// Results are returned in the order of completion:
865    /// ```
866    /// # futures::executor::block_on(async {
867    /// use futures::channel::oneshot;
868    /// use futures::stream::{self, StreamExt, TryStreamExt};
869    ///
870    /// let (send_one, recv_one) = oneshot::channel();
871    /// let (send_two, recv_two) = oneshot::channel();
872    ///
873    /// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]);
874    ///
875    /// let mut buffered = stream_of_futures.try_buffer_unordered(10);
876    ///
877    /// send_two.send(2i32)?;
878    /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
879    ///
880    /// send_one.send(1i32)?;
881    /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
882    ///
883    /// assert_eq!(buffered.next().await, None);
884    /// # Ok::<(), i32>(()) }).unwrap();
885    /// ```
886    ///
887    /// Errors from the underlying stream itself are propagated:
888    /// ```
889    /// # futures::executor::block_on(async {
890    /// use futures::channel::mpsc;
891    /// use futures::stream::{StreamExt, TryStreamExt};
892    ///
893    /// let (sink, stream_of_futures) = mpsc::unbounded();
894    /// let mut buffered = stream_of_futures.try_buffer_unordered(10);
895    ///
896    /// sink.unbounded_send(Ok(async { Ok(7i32) }))?;
897    /// assert_eq!(buffered.next().await, Some(Ok(7i32)));
898    ///
899    /// sink.unbounded_send(Err("error in the stream"))?;
900    /// assert_eq!(buffered.next().await, Some(Err("error in the stream")));
901    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
902    /// ```
903    #[cfg(not(futures_no_atomic_cas))]
904    #[cfg(feature = "alloc")]
905    fn try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self>
906    where
907        Self::Ok: TryFuture<Error = Self::Error>,
908        Self: Sized,
909    {
910        assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(
911            TryBufferUnordered::new(self, n),
912        )
913    }
914
915    /// Attempt to execute several futures from a stream concurrently.
916    ///
917    /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
918    /// that matches the stream's `Error` type.
919    ///
920    /// This adaptor will buffer up to `n` futures and then return their
921    /// outputs in the same order as the underlying stream. If the underlying stream returns an error, it will
922    /// be immediately propagated.
923    ///
924    /// The returned stream will be a stream of results, each containing either
925    /// an error or a future's output. An error can be produced either by the
926    /// underlying stream itself or by one of the futures it yielded.
927    ///
928    /// This method is only available when the `std` or `alloc` feature of this
929    /// library is activated, and it is activated by default.
930    ///
931    /// # Examples
932    ///
933    /// Results are returned in the order of addition:
934    /// ```
935    /// # futures::executor::block_on(async {
936    /// use futures::channel::oneshot;
937    /// use futures::future::lazy;
938    /// use futures::stream::{self, StreamExt, TryStreamExt};
939    ///
940    /// let (send_one, recv_one) = oneshot::channel();
941    /// let (send_two, recv_two) = oneshot::channel();
942    ///
943    /// let mut buffered = lazy(move |cx| {
944    ///     let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]);
945    ///
946    ///     let mut buffered = stream_of_futures.try_buffered(10);
947    ///
948    ///     assert!(buffered.try_poll_next_unpin(cx).is_pending());
949    ///
950    ///     send_two.send(2i32)?;
951    ///     assert!(buffered.try_poll_next_unpin(cx).is_pending());
952    ///     Ok::<_, i32>(buffered)
953    /// }).await?;
954    ///
955    /// send_one.send(1i32)?;
956    /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
957    /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
958    ///
959    /// assert_eq!(buffered.next().await, None);
960    /// # Ok::<(), i32>(()) }).unwrap();
961    /// ```
962    ///
963    /// Errors from the underlying stream itself are propagated:
964    /// ```
965    /// # futures::executor::block_on(async {
966    /// use futures::channel::mpsc;
967    /// use futures::stream::{StreamExt, TryStreamExt};
968    ///
969    /// let (sink, stream_of_futures) = mpsc::unbounded();
970    /// let mut buffered = stream_of_futures.try_buffered(10);
971    ///
972    /// sink.unbounded_send(Ok(async { Ok(7i32) }))?;
973    /// assert_eq!(buffered.next().await, Some(Ok(7i32)));
974    ///
975    /// sink.unbounded_send(Err("error in the stream"))?;
976    /// assert_eq!(buffered.next().await, Some(Err("error in the stream")));
977    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
978    /// ```
979    #[cfg(not(futures_no_atomic_cas))]
980    #[cfg(feature = "alloc")]
981    fn try_buffered(self, n: usize) -> TryBuffered<Self>
982    where
983        Self::Ok: TryFuture<Error = Self::Error>,
984        Self: Sized,
985    {
986        assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(TryBuffered::new(
987            self, n,
988        ))
989    }
990
991    // TODO: false positive warning from rustdoc. Verify once #43466 settles
992    //
993    /// A convenience method for calling [`TryStream::try_poll_next`] on [`Unpin`]
994    /// stream types.
995    fn try_poll_next_unpin(
996        &mut self,
997        cx: &mut Context<'_>,
998    ) -> Poll<Option<Result<Self::Ok, Self::Error>>>
999    where
1000        Self: Unpin,
1001    {
1002        Pin::new(self).try_poll_next(cx)
1003    }
1004
1005    /// Wraps a [`TryStream`] into a stream compatible with libraries using
1006    /// futures 0.1 `Stream`. Requires the `compat` feature to be enabled.
1007    /// ```
1008    /// # if cfg!(miri) { return; } // Miri does not support epoll
1009    /// use futures::future::{FutureExt, TryFutureExt};
1010    /// # let (tx, rx) = futures::channel::oneshot::channel();
1011    ///
1012    /// let future03 = async {
1013    ///     println!("Running on the pool");
1014    ///     tx.send(42).unwrap();
1015    /// };
1016    ///
1017    /// let future01 = future03
1018    ///     .unit_error() // Make it a TryFuture
1019    ///     .boxed()  // Make it Unpin
1020    ///     .compat();
1021    ///
1022    /// tokio::run(future01);
1023    /// # assert_eq!(42, futures::executor::block_on(rx).unwrap());
1024    /// ```
1025    #[cfg(feature = "compat")]
1026    #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
1027    fn compat(self) -> Compat<Self>
1028    where
1029        Self: Sized + Unpin,
1030    {
1031        Compat::new(self)
1032    }
1033
1034    /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead).
1035    ///
1036    /// This method is only available when the `std` feature of this
1037    /// library is activated, and it is activated by default.
1038    ///
1039    /// # Examples
1040    ///
1041    /// ```
1042    /// # futures::executor::block_on(async {
1043    /// use futures::stream::{self, TryStreamExt};
1044    /// use futures::io::AsyncReadExt;
1045    ///
1046    /// let stream = stream::iter([Ok(vec![1, 2, 3]), Ok(vec![4, 5])]);
1047    /// let mut reader = stream.into_async_read();
1048    ///
1049    /// let mut buf = Vec::new();
1050    /// reader.read_to_end(&mut buf).await.unwrap();
1051    /// assert_eq!(buf, [1, 2, 3, 4, 5]);
1052    /// # })
1053    /// ```
1054    #[cfg(feature = "io")]
1055    #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
1056    #[cfg(feature = "std")]
1057    fn into_async_read(self) -> IntoAsyncRead<Self>
1058    where
1059        Self: Sized + TryStreamExt<Error = std::io::Error>,
1060        Self::Ok: AsRef<[u8]>,
1061    {
1062        crate::io::assert_read(IntoAsyncRead::new(self))
1063    }
1064}