futures_util/stream/try_stream/mod.rs
1//! Streams
2//!
3//! This module contains a number of functions for working with `Streams`s
4//! that return `Result`s, allowing for short-circuiting computations.
5
6#[cfg(feature = "compat")]
7use crate::compat::Compat;
8use crate::fns::{
9 inspect_err_fn, inspect_ok_fn, into_fn, map_err_fn, map_ok_fn, InspectErrFn, InspectOkFn,
10 IntoFn, MapErrFn, MapOkFn,
11};
12use crate::future::assert_future;
13use crate::stream::assert_stream;
14use crate::stream::{Inspect, Map};
15#[cfg(feature = "alloc")]
16use alloc::vec::Vec;
17use core::pin::Pin;
18use futures_core::{
19 future::{Future, TryFuture},
20 stream::TryStream,
21 task::{Context, Poll},
22};
23
24mod and_then;
25#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
26pub use self::and_then::AndThen;
27
28delegate_all!(
29 /// Stream for the [`err_into`](super::TryStreamExt::err_into) method.
30 ErrInto<St, E>(
31 MapErr<St, IntoFn<E>>
32 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| MapErr::new(x, into_fn())]
33);
34
35delegate_all!(
36 /// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method.
37 InspectOk<St, F>(
38 Inspect<IntoStream<St>, InspectOkFn<F>>
39 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_ok_fn(f))]
40);
41
42delegate_all!(
43 /// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method.
44 InspectErr<St, F>(
45 Inspect<IntoStream<St>, InspectErrFn<F>>
46 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_err_fn(f))]
47);
48
49mod into_stream;
50#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
51pub use self::into_stream::IntoStream;
52
53delegate_all!(
54 /// Stream for the [`map_ok`](super::TryStreamExt::map_ok) method.
55 MapOk<St, F>(
56 Map<IntoStream<St>, MapOkFn<F>>
57 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_ok_fn(f))]
58);
59
60delegate_all!(
61 /// Stream for the [`map_err`](super::TryStreamExt::map_err) method.
62 MapErr<St, F>(
63 Map<IntoStream<St>, MapErrFn<F>>
64 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_err_fn(f))]
65);
66
67mod or_else;
68#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
69pub use self::or_else::OrElse;
70
71mod try_next;
72#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
73pub use self::try_next::TryNext;
74
75mod try_for_each;
76#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
77pub use self::try_for_each::TryForEach;
78
79mod try_filter;
80#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
81pub use self::try_filter::TryFilter;
82
83mod try_filter_map;
84#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
85pub use self::try_filter_map::TryFilterMap;
86
87mod try_flatten;
88#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
89pub use self::try_flatten::TryFlatten;
90
91mod try_collect;
92#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
93pub use self::try_collect::TryCollect;
94
95mod try_concat;
96#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
97pub use self::try_concat::TryConcat;
98
99#[cfg(feature = "alloc")]
100mod try_chunks;
101#[cfg(feature = "alloc")]
102#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
103pub use self::try_chunks::{TryChunks, TryChunksError};
104
105mod try_fold;
106#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
107pub use self::try_fold::TryFold;
108
109mod try_unfold;
110#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
111pub use self::try_unfold::{try_unfold, TryUnfold};
112
113mod try_skip_while;
114#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
115pub use self::try_skip_while::TrySkipWhile;
116
117mod try_take_while;
118#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
119pub use self::try_take_while::TryTakeWhile;
120
121#[cfg(not(futures_no_atomic_cas))]
122#[cfg(feature = "alloc")]
123mod try_buffer_unordered;
124#[cfg(not(futures_no_atomic_cas))]
125#[cfg(feature = "alloc")]
126#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
127pub use self::try_buffer_unordered::TryBufferUnordered;
128
129#[cfg(not(futures_no_atomic_cas))]
130#[cfg(feature = "alloc")]
131mod try_buffered;
132#[cfg(not(futures_no_atomic_cas))]
133#[cfg(feature = "alloc")]
134#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
135pub use self::try_buffered::TryBuffered;
136
137#[cfg(not(futures_no_atomic_cas))]
138#[cfg(feature = "alloc")]
139mod try_for_each_concurrent;
140#[cfg(not(futures_no_atomic_cas))]
141#[cfg(feature = "alloc")]
142#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
143pub use self::try_for_each_concurrent::TryForEachConcurrent;
144
145#[cfg(feature = "io")]
146#[cfg(feature = "std")]
147mod into_async_read;
148#[cfg(feature = "io")]
149#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
150#[cfg(feature = "std")]
151#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
152pub use self::into_async_read::IntoAsyncRead;
153
154impl<S: ?Sized + TryStream> TryStreamExt for S {}
155
156/// Adapters specific to `Result`-returning streams
157pub trait TryStreamExt: TryStream {
158 /// Wraps the current stream in a new stream which converts the error type
159 /// into the one provided.
160 ///
161 /// # Examples
162 ///
163 /// ```
164 /// # futures::executor::block_on(async {
165 /// use futures::stream::{self, TryStreamExt};
166 ///
167 /// let mut stream =
168 /// stream::iter(vec![Ok(()), Err(5i32)])
169 /// .err_into::<i64>();
170 ///
171 /// assert_eq!(stream.try_next().await, Ok(Some(())));
172 /// assert_eq!(stream.try_next().await, Err(5i64));
173 /// # })
174 /// ```
175 fn err_into<E>(self) -> ErrInto<Self, E>
176 where
177 Self: Sized,
178 Self::Error: Into<E>,
179 {
180 assert_stream::<Result<Self::Ok, E>, _>(ErrInto::new(self))
181 }
182
183 /// Wraps the current stream in a new stream which maps the success value
184 /// using the provided closure.
185 ///
186 /// # Examples
187 ///
188 /// ```
189 /// # futures::executor::block_on(async {
190 /// use futures::stream::{self, TryStreamExt};
191 ///
192 /// let mut stream =
193 /// stream::iter(vec![Ok(5), Err(0)])
194 /// .map_ok(|x| x + 2);
195 ///
196 /// assert_eq!(stream.try_next().await, Ok(Some(7)));
197 /// assert_eq!(stream.try_next().await, Err(0));
198 /// # })
199 /// ```
200 fn map_ok<T, F>(self, f: F) -> MapOk<Self, F>
201 where
202 Self: Sized,
203 F: FnMut(Self::Ok) -> T,
204 {
205 assert_stream::<Result<T, Self::Error>, _>(MapOk::new(self, f))
206 }
207
208 /// Wraps the current stream in a new stream which maps the error value
209 /// using the provided closure.
210 ///
211 /// # Examples
212 ///
213 /// ```
214 /// # futures::executor::block_on(async {
215 /// use futures::stream::{self, TryStreamExt};
216 ///
217 /// let mut stream =
218 /// stream::iter(vec![Ok(5), Err(0)])
219 /// .map_err(|x| x + 2);
220 ///
221 /// assert_eq!(stream.try_next().await, Ok(Some(5)));
222 /// assert_eq!(stream.try_next().await, Err(2));
223 /// # })
224 /// ```
225 fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
226 where
227 Self: Sized,
228 F: FnMut(Self::Error) -> E,
229 {
230 assert_stream::<Result<Self::Ok, E>, _>(MapErr::new(self, f))
231 }
232
233 /// Chain on a computation for when a value is ready, passing the successful
234 /// results to the provided closure `f`.
235 ///
236 /// This function can be used to run a unit of work when the next successful
237 /// value on a stream is ready. The closure provided will be yielded a value
238 /// when ready, and the returned future will then be run to completion to
239 /// produce the next value on this stream.
240 ///
241 /// Any errors produced by this stream will not be passed to the closure,
242 /// and will be passed through.
243 ///
244 /// The returned value of the closure must implement the `TryFuture` trait
245 /// and can represent some more work to be done before the composed stream
246 /// is finished.
247 ///
248 /// Note that this function consumes the receiving stream and returns a
249 /// wrapped version of it.
250 ///
251 /// To process the entire stream and return a single future representing
252 /// success or error, use `try_for_each` instead.
253 ///
254 /// # Examples
255 ///
256 /// ```
257 /// use futures::channel::mpsc;
258 /// use futures::future;
259 /// use futures::stream::TryStreamExt;
260 ///
261 /// let (_tx, rx) = mpsc::channel::<Result<i32, ()>>(1);
262 ///
263 /// let rx = rx.and_then(|result| {
264 /// future::ok(if result % 2 == 0 {
265 /// Some(result)
266 /// } else {
267 /// None
268 /// })
269 /// });
270 /// ```
271 fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F>
272 where
273 F: FnMut(Self::Ok) -> Fut,
274 Fut: TryFuture<Error = Self::Error>,
275 Self: Sized,
276 {
277 assert_stream::<Result<Fut::Ok, Fut::Error>, _>(AndThen::new(self, f))
278 }
279
280 /// Chain on a computation for when an error happens, passing the
281 /// erroneous result to the provided closure `f`.
282 ///
283 /// This function can be used to run a unit of work and attempt to recover from
284 /// an error if one happens. The closure provided will be yielded an error
285 /// when one appears, and the returned future will then be run to completion
286 /// to produce the next value on this stream.
287 ///
288 /// Any successful values produced by this stream will not be passed to the
289 /// closure, and will be passed through.
290 ///
291 /// The returned value of the closure must implement the [`TryFuture`](futures_core::future::TryFuture) trait
292 /// and can represent some more work to be done before the composed stream
293 /// is finished.
294 ///
295 /// Note that this function consumes the receiving stream and returns a
296 /// wrapped version of it.
297 fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F>
298 where
299 F: FnMut(Self::Error) -> Fut,
300 Fut: TryFuture<Ok = Self::Ok>,
301 Self: Sized,
302 {
303 assert_stream::<Result<Self::Ok, Fut::Error>, _>(OrElse::new(self, f))
304 }
305
306 /// Do something with the success value of this stream, afterwards passing
307 /// it on.
308 ///
309 /// This is similar to the `StreamExt::inspect` method where it allows
310 /// easily inspecting the success value as it passes through the stream, for
311 /// example to debug what's going on.
312 fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F>
313 where
314 F: FnMut(&Self::Ok),
315 Self: Sized,
316 {
317 assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectOk::new(self, f))
318 }
319
320 /// Do something with the error value of this stream, afterwards passing it on.
321 ///
322 /// This is similar to the `StreamExt::inspect` method where it allows
323 /// easily inspecting the error value as it passes through the stream, for
324 /// example to debug what's going on.
325 fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
326 where
327 F: FnMut(&Self::Error),
328 Self: Sized,
329 {
330 assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectErr::new(self, f))
331 }
332
333 /// Wraps a [`TryStream`] into a type that implements
334 /// [`Stream`](futures_core::stream::Stream)
335 ///
336 /// [`TryStream`]s currently do not implement the
337 /// [`Stream`](futures_core::stream::Stream) trait because of limitations
338 /// of the compiler.
339 ///
340 /// # Examples
341 ///
342 /// ```
343 /// use futures::stream::{Stream, TryStream, TryStreamExt};
344 ///
345 /// # type T = i32;
346 /// # type E = ();
347 /// fn make_try_stream() -> impl TryStream<Ok = T, Error = E> { // ... }
348 /// # futures::stream::empty()
349 /// # }
350 /// fn take_stream(stream: impl Stream<Item = Result<T, E>>) { /* ... */ }
351 ///
352 /// take_stream(make_try_stream().into_stream());
353 /// ```
354 fn into_stream(self) -> IntoStream<Self>
355 where
356 Self: Sized,
357 {
358 assert_stream::<Result<Self::Ok, Self::Error>, _>(IntoStream::new(self))
359 }
360
361 /// Creates a future that attempts to resolve the next item in the stream.
362 /// If an error is encountered before the next item, the error is returned
363 /// instead.
364 ///
365 /// This is similar to the `Stream::next` combinator, but returns a
366 /// `Result<Option<T>, E>` rather than an `Option<Result<T, E>>`, making
367 /// for easy use with the `?` operator.
368 ///
369 /// # Examples
370 ///
371 /// ```
372 /// # futures::executor::block_on(async {
373 /// use futures::stream::{self, TryStreamExt};
374 ///
375 /// let mut stream = stream::iter(vec![Ok(()), Err(())]);
376 ///
377 /// assert_eq!(stream.try_next().await, Ok(Some(())));
378 /// assert_eq!(stream.try_next().await, Err(()));
379 /// # })
380 /// ```
381 fn try_next(&mut self) -> TryNext<'_, Self>
382 where
383 Self: Unpin,
384 {
385 assert_future::<Result<Option<Self::Ok>, Self::Error>, _>(TryNext::new(self))
386 }
387
388 /// Attempts to run this stream to completion, executing the provided
389 /// asynchronous closure for each element on the stream.
390 ///
391 /// The provided closure will be called for each item this stream produces,
392 /// yielding a future. That future will then be executed to completion
393 /// before moving on to the next item.
394 ///
395 /// The returned value is a [`Future`](futures_core::future::Future) where the
396 /// [`Output`](futures_core::future::Future::Output) type is
397 /// `Result<(), Self::Error>`. If any of the intermediate
398 /// futures or the stream returns an error, this future will return
399 /// immediately with an error.
400 ///
401 /// # Examples
402 ///
403 /// ```
404 /// # futures::executor::block_on(async {
405 /// use futures::future;
406 /// use futures::stream::{self, TryStreamExt};
407 ///
408 /// let mut x = 0i32;
409 ///
410 /// {
411 /// let fut = stream::repeat(Ok(1)).try_for_each(|item| {
412 /// x += item;
413 /// future::ready(if x == 3 { Err(()) } else { Ok(()) })
414 /// });
415 /// assert_eq!(fut.await, Err(()));
416 /// }
417 ///
418 /// assert_eq!(x, 3);
419 /// # })
420 /// ```
421 fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F>
422 where
423 F: FnMut(Self::Ok) -> Fut,
424 Fut: TryFuture<Ok = (), Error = Self::Error>,
425 Self: Sized,
426 {
427 assert_future::<Result<(), Self::Error>, _>(TryForEach::new(self, f))
428 }
429
430 /// Skip elements on this stream while the provided asynchronous predicate
431 /// resolves to `true`.
432 ///
433 /// This function is similar to
434 /// [`StreamExt::skip_while`](crate::stream::StreamExt::skip_while) but exits
435 /// early if an error occurs.
436 ///
437 /// # Examples
438 ///
439 /// ```
440 /// # futures::executor::block_on(async {
441 /// use futures::future;
442 /// use futures::stream::{self, TryStreamExt};
443 ///
444 /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(3), Ok(2)]);
445 /// let stream = stream.try_skip_while(|x| future::ready(Ok(*x < 3)));
446 ///
447 /// let output: Result<Vec<i32>, i32> = stream.try_collect().await;
448 /// assert_eq!(output, Ok(vec![3, 2]));
449 /// # })
450 /// ```
451 fn try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F>
452 where
453 F: FnMut(&Self::Ok) -> Fut,
454 Fut: TryFuture<Ok = bool, Error = Self::Error>,
455 Self: Sized,
456 {
457 assert_stream::<Result<Self::Ok, Self::Error>, _>(TrySkipWhile::new(self, f))
458 }
459
460 /// Take elements on this stream while the provided asynchronous predicate
461 /// resolves to `true`.
462 ///
463 /// This function is similar to
464 /// [`StreamExt::take_while`](crate::stream::StreamExt::take_while) but exits
465 /// early if an error occurs.
466 ///
467 /// # Examples
468 ///
469 /// ```
470 /// # futures::executor::block_on(async {
471 /// use futures::future;
472 /// use futures::stream::{self, TryStreamExt};
473 ///
474 /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Ok(2)]);
475 /// let stream = stream.try_take_while(|x| future::ready(Ok(*x < 3)));
476 ///
477 /// let output: Result<Vec<i32>, i32> = stream.try_collect().await;
478 /// assert_eq!(output, Ok(vec![1, 2]));
479 /// # })
480 /// ```
481 fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F>
482 where
483 F: FnMut(&Self::Ok) -> Fut,
484 Fut: TryFuture<Ok = bool, Error = Self::Error>,
485 Self: Sized,
486 {
487 assert_stream::<Result<Self::Ok, Self::Error>, _>(TryTakeWhile::new(self, f))
488 }
489
490 /// Attempts to run this stream to completion, executing the provided asynchronous
491 /// closure for each element on the stream concurrently as elements become
492 /// available, exiting as soon as an error occurs.
493 ///
494 /// This is similar to
495 /// [`StreamExt::for_each_concurrent`](crate::stream::StreamExt::for_each_concurrent),
496 /// but will resolve to an error immediately if the underlying stream or the provided
497 /// closure return an error.
498 ///
499 /// This method is only available when the `std` or `alloc` feature of this
500 /// library is activated, and it is activated by default.
501 ///
502 /// # Examples
503 ///
504 /// ```
505 /// # futures::executor::block_on(async {
506 /// use futures::channel::oneshot;
507 /// use futures::stream::{self, StreamExt, TryStreamExt};
508 ///
509 /// let (tx1, rx1) = oneshot::channel();
510 /// let (tx2, rx2) = oneshot::channel();
511 /// let (_tx3, rx3) = oneshot::channel();
512 ///
513 /// let stream = stream::iter(vec![rx1, rx2, rx3]);
514 /// let fut = stream.map(Ok).try_for_each_concurrent(
515 /// /* limit */ 2,
516 /// |rx| async move {
517 /// let res: Result<(), oneshot::Canceled> = rx.await;
518 /// res
519 /// }
520 /// );
521 ///
522 /// tx1.send(()).unwrap();
523 /// // Drop the second sender so that `rx2` resolves to `Canceled`.
524 /// drop(tx2);
525 ///
526 /// // The final result is an error because the second future
527 /// // resulted in an error.
528 /// assert_eq!(Err(oneshot::Canceled), fut.await);
529 /// # })
530 /// ```
531 #[cfg(not(futures_no_atomic_cas))]
532 #[cfg(feature = "alloc")]
533 fn try_for_each_concurrent<Fut, F>(
534 self,
535 limit: impl Into<Option<usize>>,
536 f: F,
537 ) -> TryForEachConcurrent<Self, Fut, F>
538 where
539 F: FnMut(Self::Ok) -> Fut,
540 Fut: Future<Output = Result<(), Self::Error>>,
541 Self: Sized,
542 {
543 assert_future::<Result<(), Self::Error>, _>(TryForEachConcurrent::new(
544 self,
545 limit.into(),
546 f,
547 ))
548 }
549
550 /// Attempt to transform a stream into a collection,
551 /// returning a future representing the result of that computation.
552 ///
553 /// This combinator will collect all successful results of this stream and
554 /// collect them into the specified collection type. If an error happens then all
555 /// collected elements will be dropped and the error will be returned.
556 ///
557 /// The returned future will be resolved when the stream terminates.
558 ///
559 /// # Examples
560 ///
561 /// ```
562 /// # futures::executor::block_on(async {
563 /// use futures::channel::mpsc;
564 /// use futures::stream::TryStreamExt;
565 /// use std::thread;
566 ///
567 /// let (tx, rx) = mpsc::unbounded();
568 ///
569 /// thread::spawn(move || {
570 /// for i in 1..=5 {
571 /// tx.unbounded_send(Ok(i)).unwrap();
572 /// }
573 /// tx.unbounded_send(Err(6)).unwrap();
574 /// });
575 ///
576 /// let output: Result<Vec<i32>, i32> = rx.try_collect().await;
577 /// assert_eq!(output, Err(6));
578 /// # })
579 /// ```
580 fn try_collect<C: Default + Extend<Self::Ok>>(self) -> TryCollect<Self, C>
581 where
582 Self: Sized,
583 {
584 assert_future::<Result<C, Self::Error>, _>(TryCollect::new(self))
585 }
586
587 /// An adaptor for chunking up successful items of the stream inside a vector.
588 ///
589 /// This combinator will attempt to pull successful items from this stream and buffer
590 /// them into a local vector. At most `capacity` items will get buffered
591 /// before they're yielded from the returned stream.
592 ///
593 /// Note that the vectors returned from this iterator may not always have
594 /// `capacity` elements. If the underlying stream ended and only a partial
595 /// vector was created, it'll be returned. Additionally if an error happens
596 /// from the underlying stream then the currently buffered items will be
597 /// yielded.
598 ///
599 /// This method is only available when the `std` or `alloc` feature of this
600 /// library is activated, and it is activated by default.
601 ///
602 /// This function is similar to
603 /// [`StreamExt::chunks`](crate::stream::StreamExt::chunks) but exits
604 /// early if an error occurs.
605 ///
606 /// # Examples
607 ///
608 /// ```
609 /// # futures::executor::block_on(async {
610 /// use futures::stream::{self, TryChunksError, TryStreamExt};
611 ///
612 /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]);
613 /// let mut stream = stream.try_chunks(2);
614 ///
615 /// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2])));
616 /// assert_eq!(stream.try_next().await, Err(TryChunksError(vec![3], 4)));
617 /// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6])));
618 /// # })
619 /// ```
620 ///
621 /// # Panics
622 ///
623 /// This method will panic if `capacity` is zero.
624 #[cfg(feature = "alloc")]
625 fn try_chunks(self, capacity: usize) -> TryChunks<Self>
626 where
627 Self: Sized,
628 {
629 assert_stream::<Result<Vec<Self::Ok>, TryChunksError<Self::Ok, Self::Error>>, _>(
630 TryChunks::new(self, capacity),
631 )
632 }
633
634 /// Attempt to filter the values produced by this stream according to the
635 /// provided asynchronous closure.
636 ///
637 /// As values of this stream are made available, the provided predicate `f`
638 /// will be run on them. If the predicate returns a `Future` which resolves
639 /// to `true`, then the stream will yield the value, but if the predicate
640 /// return a `Future` which resolves to `false`, then the value will be
641 /// discarded and the next value will be produced.
642 ///
643 /// All errors are passed through without filtering in this combinator.
644 ///
645 /// Note that this function consumes the stream passed into it and returns a
646 /// wrapped version of it, similar to the existing `filter` methods in
647 /// the standard library.
648 ///
649 /// # Examples
650 /// ```
651 /// # futures::executor::block_on(async {
652 /// use futures::future;
653 /// use futures::stream::{self, StreamExt, TryStreamExt};
654 ///
655 /// let stream = stream::iter(vec![Ok(1i32), Ok(2i32), Ok(3i32), Err("error")]);
656 /// let mut evens = stream.try_filter(|x| {
657 /// future::ready(x % 2 == 0)
658 /// });
659 ///
660 /// assert_eq!(evens.next().await, Some(Ok(2)));
661 /// assert_eq!(evens.next().await, Some(Err("error")));
662 /// # })
663 /// ```
664 fn try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F>
665 where
666 Fut: Future<Output = bool>,
667 F: FnMut(&Self::Ok) -> Fut,
668 Self: Sized,
669 {
670 assert_stream::<Result<Self::Ok, Self::Error>, _>(TryFilter::new(self, f))
671 }
672
673 /// Attempt to filter the values produced by this stream while
674 /// simultaneously mapping them to a different type according to the
675 /// provided asynchronous closure.
676 ///
677 /// As values of this stream are made available, the provided function will
678 /// be run on them. If the future returned by the predicate `f` resolves to
679 /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
680 /// it resolves to [`None`] then the next value will be produced.
681 ///
682 /// All errors are passed through without filtering in this combinator.
683 ///
684 /// Note that this function consumes the stream passed into it and returns a
685 /// wrapped version of it, similar to the existing `filter_map` methods in
686 /// the standard library.
687 ///
688 /// # Examples
689 /// ```
690 /// # futures::executor::block_on(async {
691 /// use futures::stream::{self, StreamExt, TryStreamExt};
692 /// use futures::pin_mut;
693 ///
694 /// let stream = stream::iter(vec![Ok(1i32), Ok(6i32), Err("error")]);
695 /// let halves = stream.try_filter_map(|x| async move {
696 /// let ret = if x % 2 == 0 { Some(x / 2) } else { None };
697 /// Ok(ret)
698 /// });
699 ///
700 /// pin_mut!(halves);
701 /// assert_eq!(halves.next().await, Some(Ok(3)));
702 /// assert_eq!(halves.next().await, Some(Err("error")));
703 /// # })
704 /// ```
705 fn try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F>
706 where
707 Fut: TryFuture<Ok = Option<T>, Error = Self::Error>,
708 F: FnMut(Self::Ok) -> Fut,
709 Self: Sized,
710 {
711 assert_stream::<Result<T, Self::Error>, _>(TryFilterMap::new(self, f))
712 }
713
714 /// Flattens a stream of streams into just one continuous stream.
715 ///
716 /// If this stream's elements are themselves streams then this combinator
717 /// will flatten out the entire stream to one long chain of elements. Any
718 /// errors are passed through without looking at them, but otherwise each
719 /// individual stream will get exhausted before moving on to the next.
720 ///
721 /// # Examples
722 ///
723 /// ```
724 /// # futures::executor::block_on(async {
725 /// use futures::channel::mpsc;
726 /// use futures::stream::{StreamExt, TryStreamExt};
727 /// use std::thread;
728 ///
729 /// let (tx1, rx1) = mpsc::unbounded();
730 /// let (tx2, rx2) = mpsc::unbounded();
731 /// let (tx3, rx3) = mpsc::unbounded();
732 ///
733 /// thread::spawn(move || {
734 /// tx1.unbounded_send(Ok(1)).unwrap();
735 /// });
736 /// thread::spawn(move || {
737 /// tx2.unbounded_send(Ok(2)).unwrap();
738 /// tx2.unbounded_send(Err(3)).unwrap();
739 /// tx2.unbounded_send(Ok(4)).unwrap();
740 /// });
741 /// thread::spawn(move || {
742 /// tx3.unbounded_send(Ok(rx1)).unwrap();
743 /// tx3.unbounded_send(Ok(rx2)).unwrap();
744 /// tx3.unbounded_send(Err(5)).unwrap();
745 /// });
746 ///
747 /// let mut stream = rx3.try_flatten();
748 /// assert_eq!(stream.next().await, Some(Ok(1)));
749 /// assert_eq!(stream.next().await, Some(Ok(2)));
750 /// assert_eq!(stream.next().await, Some(Err(3)));
751 /// assert_eq!(stream.next().await, Some(Ok(4)));
752 /// assert_eq!(stream.next().await, Some(Err(5)));
753 /// assert_eq!(stream.next().await, None);
754 /// # });
755 /// ```
756 fn try_flatten(self) -> TryFlatten<Self>
757 where
758 Self::Ok: TryStream,
759 <Self::Ok as TryStream>::Error: From<Self::Error>,
760 Self: Sized,
761 {
762 assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>(
763 TryFlatten::new(self),
764 )
765 }
766
767 /// Attempt to execute an accumulating asynchronous computation over a
768 /// stream, collecting all the values into one final result.
769 ///
770 /// This combinator will accumulate all values returned by this stream
771 /// according to the closure provided. The initial state is also provided to
772 /// this method and then is returned again by each execution of the closure.
773 /// Once the entire stream has been exhausted the returned future will
774 /// resolve to this value.
775 ///
776 /// This method is similar to [`fold`](crate::stream::StreamExt::fold), but will
777 /// exit early if an error is encountered in either the stream or the
778 /// provided closure.
779 ///
780 /// # Examples
781 ///
782 /// ```
783 /// # futures::executor::block_on(async {
784 /// use futures::stream::{self, TryStreamExt};
785 ///
786 /// let number_stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2)]);
787 /// let sum = number_stream.try_fold(0, |acc, x| async move { Ok(acc + x) });
788 /// assert_eq!(sum.await, Ok(3));
789 ///
790 /// let number_stream_with_err = stream::iter(vec![Ok::<i32, i32>(1), Err(2), Ok(1)]);
791 /// let sum = number_stream_with_err.try_fold(0, |acc, x| async move { Ok(acc + x) });
792 /// assert_eq!(sum.await, Err(2));
793 /// # })
794 /// ```
795 fn try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F>
796 where
797 F: FnMut(T, Self::Ok) -> Fut,
798 Fut: TryFuture<Ok = T, Error = Self::Error>,
799 Self: Sized,
800 {
801 assert_future::<Result<T, Self::Error>, _>(TryFold::new(self, f, init))
802 }
803
804 /// Attempt to concatenate all items of a stream into a single
805 /// extendable destination, returning a future representing the end result.
806 ///
807 /// This combinator will extend the first item with the contents of all
808 /// the subsequent successful results of the stream. If the stream is empty,
809 /// the default value will be returned.
810 ///
811 /// Works with all collections that implement the [`Extend`](std::iter::Extend) trait.
812 ///
813 /// This method is similar to [`concat`](crate::stream::StreamExt::concat), but will
814 /// exit early if an error is encountered in the stream.
815 ///
816 /// # Examples
817 ///
818 /// ```
819 /// # futures::executor::block_on(async {
820 /// use futures::channel::mpsc;
821 /// use futures::stream::TryStreamExt;
822 /// use std::thread;
823 ///
824 /// let (tx, rx) = mpsc::unbounded::<Result<Vec<i32>, ()>>();
825 ///
826 /// thread::spawn(move || {
827 /// for i in (0..3).rev() {
828 /// let n = i * 3;
829 /// tx.unbounded_send(Ok(vec![n + 1, n + 2, n + 3])).unwrap();
830 /// }
831 /// });
832 ///
833 /// let result = rx.try_concat().await;
834 ///
835 /// assert_eq!(result, Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3]));
836 /// # });
837 /// ```
838 fn try_concat(self) -> TryConcat<Self>
839 where
840 Self: Sized,
841 Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default,
842 {
843 assert_future::<Result<Self::Ok, Self::Error>, _>(TryConcat::new(self))
844 }
845
846 /// Attempt to execute several futures from a stream concurrently (unordered).
847 ///
848 /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
849 /// that matches the stream's `Error` type.
850 ///
851 /// This adaptor will buffer up to `n` futures and then return their
852 /// outputs in the order in which they complete. If the underlying stream
853 /// returns an error, it will be immediately propagated.
854 ///
855 /// The returned stream will be a stream of results, each containing either
856 /// an error or a future's output. An error can be produced either by the
857 /// underlying stream itself or by one of the futures it yielded.
858 ///
859 /// This method is only available when the `std` or `alloc` feature of this
860 /// library is activated, and it is activated by default.
861 ///
862 /// # Examples
863 ///
864 /// Results are returned in the order of completion:
865 /// ```
866 /// # futures::executor::block_on(async {
867 /// use futures::channel::oneshot;
868 /// use futures::stream::{self, StreamExt, TryStreamExt};
869 ///
870 /// let (send_one, recv_one) = oneshot::channel();
871 /// let (send_two, recv_two) = oneshot::channel();
872 ///
873 /// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]);
874 ///
875 /// let mut buffered = stream_of_futures.try_buffer_unordered(10);
876 ///
877 /// send_two.send(2i32)?;
878 /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
879 ///
880 /// send_one.send(1i32)?;
881 /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
882 ///
883 /// assert_eq!(buffered.next().await, None);
884 /// # Ok::<(), i32>(()) }).unwrap();
885 /// ```
886 ///
887 /// Errors from the underlying stream itself are propagated:
888 /// ```
889 /// # futures::executor::block_on(async {
890 /// use futures::channel::mpsc;
891 /// use futures::stream::{StreamExt, TryStreamExt};
892 ///
893 /// let (sink, stream_of_futures) = mpsc::unbounded();
894 /// let mut buffered = stream_of_futures.try_buffer_unordered(10);
895 ///
896 /// sink.unbounded_send(Ok(async { Ok(7i32) }))?;
897 /// assert_eq!(buffered.next().await, Some(Ok(7i32)));
898 ///
899 /// sink.unbounded_send(Err("error in the stream"))?;
900 /// assert_eq!(buffered.next().await, Some(Err("error in the stream")));
901 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
902 /// ```
903 #[cfg(not(futures_no_atomic_cas))]
904 #[cfg(feature = "alloc")]
905 fn try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self>
906 where
907 Self::Ok: TryFuture<Error = Self::Error>,
908 Self: Sized,
909 {
910 assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(
911 TryBufferUnordered::new(self, n),
912 )
913 }
914
915 /// Attempt to execute several futures from a stream concurrently.
916 ///
917 /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
918 /// that matches the stream's `Error` type.
919 ///
920 /// This adaptor will buffer up to `n` futures and then return their
921 /// outputs in the same order as the underlying stream. If the underlying stream returns an error, it will
922 /// be immediately propagated.
923 ///
924 /// The returned stream will be a stream of results, each containing either
925 /// an error or a future's output. An error can be produced either by the
926 /// underlying stream itself or by one of the futures it yielded.
927 ///
928 /// This method is only available when the `std` or `alloc` feature of this
929 /// library is activated, and it is activated by default.
930 ///
931 /// # Examples
932 ///
933 /// Results are returned in the order of addition:
934 /// ```
935 /// # futures::executor::block_on(async {
936 /// use futures::channel::oneshot;
937 /// use futures::future::lazy;
938 /// use futures::stream::{self, StreamExt, TryStreamExt};
939 ///
940 /// let (send_one, recv_one) = oneshot::channel();
941 /// let (send_two, recv_two) = oneshot::channel();
942 ///
943 /// let mut buffered = lazy(move |cx| {
944 /// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]);
945 ///
946 /// let mut buffered = stream_of_futures.try_buffered(10);
947 ///
948 /// assert!(buffered.try_poll_next_unpin(cx).is_pending());
949 ///
950 /// send_two.send(2i32)?;
951 /// assert!(buffered.try_poll_next_unpin(cx).is_pending());
952 /// Ok::<_, i32>(buffered)
953 /// }).await?;
954 ///
955 /// send_one.send(1i32)?;
956 /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
957 /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
958 ///
959 /// assert_eq!(buffered.next().await, None);
960 /// # Ok::<(), i32>(()) }).unwrap();
961 /// ```
962 ///
963 /// Errors from the underlying stream itself are propagated:
964 /// ```
965 /// # futures::executor::block_on(async {
966 /// use futures::channel::mpsc;
967 /// use futures::stream::{StreamExt, TryStreamExt};
968 ///
969 /// let (sink, stream_of_futures) = mpsc::unbounded();
970 /// let mut buffered = stream_of_futures.try_buffered(10);
971 ///
972 /// sink.unbounded_send(Ok(async { Ok(7i32) }))?;
973 /// assert_eq!(buffered.next().await, Some(Ok(7i32)));
974 ///
975 /// sink.unbounded_send(Err("error in the stream"))?;
976 /// assert_eq!(buffered.next().await, Some(Err("error in the stream")));
977 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
978 /// ```
979 #[cfg(not(futures_no_atomic_cas))]
980 #[cfg(feature = "alloc")]
981 fn try_buffered(self, n: usize) -> TryBuffered<Self>
982 where
983 Self::Ok: TryFuture<Error = Self::Error>,
984 Self: Sized,
985 {
986 assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(TryBuffered::new(
987 self, n,
988 ))
989 }
990
991 // TODO: false positive warning from rustdoc. Verify once #43466 settles
992 //
993 /// A convenience method for calling [`TryStream::try_poll_next`] on [`Unpin`]
994 /// stream types.
995 fn try_poll_next_unpin(
996 &mut self,
997 cx: &mut Context<'_>,
998 ) -> Poll<Option<Result<Self::Ok, Self::Error>>>
999 where
1000 Self: Unpin,
1001 {
1002 Pin::new(self).try_poll_next(cx)
1003 }
1004
1005 /// Wraps a [`TryStream`] into a stream compatible with libraries using
1006 /// futures 0.1 `Stream`. Requires the `compat` feature to be enabled.
1007 /// ```
1008 /// # if cfg!(miri) { return; } // Miri does not support epoll
1009 /// use futures::future::{FutureExt, TryFutureExt};
1010 /// # let (tx, rx) = futures::channel::oneshot::channel();
1011 ///
1012 /// let future03 = async {
1013 /// println!("Running on the pool");
1014 /// tx.send(42).unwrap();
1015 /// };
1016 ///
1017 /// let future01 = future03
1018 /// .unit_error() // Make it a TryFuture
1019 /// .boxed() // Make it Unpin
1020 /// .compat();
1021 ///
1022 /// tokio::run(future01);
1023 /// # assert_eq!(42, futures::executor::block_on(rx).unwrap());
1024 /// ```
1025 #[cfg(feature = "compat")]
1026 #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
1027 fn compat(self) -> Compat<Self>
1028 where
1029 Self: Sized + Unpin,
1030 {
1031 Compat::new(self)
1032 }
1033
1034 /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead).
1035 ///
1036 /// This method is only available when the `std` feature of this
1037 /// library is activated, and it is activated by default.
1038 ///
1039 /// # Examples
1040 ///
1041 /// ```
1042 /// # futures::executor::block_on(async {
1043 /// use futures::stream::{self, TryStreamExt};
1044 /// use futures::io::AsyncReadExt;
1045 ///
1046 /// let stream = stream::iter([Ok(vec![1, 2, 3]), Ok(vec![4, 5])]);
1047 /// let mut reader = stream.into_async_read();
1048 ///
1049 /// let mut buf = Vec::new();
1050 /// reader.read_to_end(&mut buf).await.unwrap();
1051 /// assert_eq!(buf, [1, 2, 3, 4, 5]);
1052 /// # })
1053 /// ```
1054 #[cfg(feature = "io")]
1055 #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
1056 #[cfg(feature = "std")]
1057 fn into_async_read(self) -> IntoAsyncRead<Self>
1058 where
1059 Self: Sized + TryStreamExt<Error = std::io::Error>,
1060 Self::Ok: AsRef<[u8]>,
1061 {
1062 crate::io::assert_read(IntoAsyncRead::new(self))
1063 }
1064}