futures_util/stream/try_stream/
try_chunks.rs

1use crate::stream::{Fuse, IntoStream, StreamExt};
2
3use alloc::vec::Vec;
4use core::pin::Pin;
5use core::{fmt, mem};
6use futures_core::ready;
7use futures_core::stream::{FusedStream, Stream, TryStream};
8use futures_core::task::{Context, Poll};
9#[cfg(feature = "sink")]
10use futures_sink::Sink;
11use pin_project_lite::pin_project;
12
13pin_project! {
14    /// Stream for the [`try_chunks`](super::TryStreamExt::try_chunks) method.
15    #[derive(Debug)]
16    #[must_use = "streams do nothing unless polled"]
17    pub struct TryChunks<St: TryStream> {
18        #[pin]
19        stream: Fuse<IntoStream<St>>,
20        items: Vec<St::Ok>,
21        cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
22    }
23}
24
25impl<St: TryStream> TryChunks<St> {
26    pub(super) fn new(stream: St, capacity: usize) -> Self {
27        assert!(capacity > 0);
28
29        Self {
30            stream: IntoStream::new(stream).fuse(),
31            items: Vec::with_capacity(capacity),
32            cap: capacity,
33        }
34    }
35
36    fn take(self: Pin<&mut Self>) -> Vec<St::Ok> {
37        let cap = self.cap;
38        mem::replace(self.project().items, Vec::with_capacity(cap))
39    }
40
41    delegate_access_inner!(stream, St, (. .));
42}
43
44impl<St: TryStream> Stream for TryChunks<St> {
45    #[allow(clippy::type_complexity)]
46    type Item = Result<Vec<St::Ok>, TryChunksError<St::Ok, St::Error>>;
47
48    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
49        let mut this = self.as_mut().project();
50        loop {
51            match ready!(this.stream.as_mut().try_poll_next(cx)) {
52                // Push the item into the buffer and check whether it is full.
53                // If so, replace our buffer with a new and empty one and return
54                // the full one.
55                Some(item) => match item {
56                    Ok(item) => {
57                        this.items.push(item);
58                        if this.items.len() >= *this.cap {
59                            return Poll::Ready(Some(Ok(self.take())));
60                        }
61                    }
62                    Err(e) => {
63                        return Poll::Ready(Some(Err(TryChunksError(self.take(), e))));
64                    }
65                },
66
67                // Since the underlying stream ran out of values, return what we
68                // have buffered, if we have anything.
69                None => {
70                    let last = if this.items.is_empty() {
71                        None
72                    } else {
73                        let full_buf = mem::take(this.items);
74                        Some(full_buf)
75                    };
76
77                    return Poll::Ready(last.map(Ok));
78                }
79            }
80        }
81    }
82
83    fn size_hint(&self) -> (usize, Option<usize>) {
84        let chunk_len = if self.items.is_empty() { 0 } else { 1 };
85        let (lower, upper) = self.stream.size_hint();
86        let lower = (lower / self.cap).saturating_add(chunk_len);
87        let upper = match upper {
88            Some(x) => x.checked_add(chunk_len),
89            None => None,
90        };
91        (lower, upper)
92    }
93}
94
95impl<St: TryStream + FusedStream> FusedStream for TryChunks<St> {
96    fn is_terminated(&self) -> bool {
97        self.stream.is_terminated() && self.items.is_empty()
98    }
99}
100
101// Forwarding impl of Sink from the underlying stream
102#[cfg(feature = "sink")]
103impl<S, Item> Sink<Item> for TryChunks<S>
104where
105    S: TryStream + Sink<Item>,
106{
107    type Error = <S as Sink<Item>>::Error;
108
109    delegate_sink!(stream, Item);
110}
111
112/// Error indicating, that while chunk was collected inner stream produced an error.
113///
114/// Contains all items that were collected before an error occurred, and the stream error itself.
115#[derive(PartialEq, Eq)]
116pub struct TryChunksError<T, E>(pub Vec<T>, pub E);
117
118impl<T, E: fmt::Debug> fmt::Debug for TryChunksError<T, E> {
119    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
120        self.1.fmt(f)
121    }
122}
123
124impl<T, E: fmt::Display> fmt::Display for TryChunksError<T, E> {
125    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126        self.1.fmt(f)
127    }
128}
129
130#[cfg(feature = "std")]
131impl<T, E: fmt::Debug + fmt::Display> std::error::Error for TryChunksError<T, E> {}