futures_util/stream/try_stream/
try_chunks.rs1use crate::stream::{Fuse, IntoStream, StreamExt};
2
3use alloc::vec::Vec;
4use core::pin::Pin;
5use core::{fmt, mem};
6use futures_core::ready;
7use futures_core::stream::{FusedStream, Stream, TryStream};
8use futures_core::task::{Context, Poll};
9#[cfg(feature = "sink")]
10use futures_sink::Sink;
11use pin_project_lite::pin_project;
12
13pin_project! {
14 #[derive(Debug)]
16 #[must_use = "streams do nothing unless polled"]
17 pub struct TryChunks<St: TryStream> {
18 #[pin]
19 stream: Fuse<IntoStream<St>>,
20 items: Vec<St::Ok>,
21 cap: usize, }
23}
24
25impl<St: TryStream> TryChunks<St> {
26 pub(super) fn new(stream: St, capacity: usize) -> Self {
27 assert!(capacity > 0);
28
29 Self {
30 stream: IntoStream::new(stream).fuse(),
31 items: Vec::with_capacity(capacity),
32 cap: capacity,
33 }
34 }
35
36 fn take(self: Pin<&mut Self>) -> Vec<St::Ok> {
37 let cap = self.cap;
38 mem::replace(self.project().items, Vec::with_capacity(cap))
39 }
40
41 delegate_access_inner!(stream, St, (. .));
42}
43
44impl<St: TryStream> Stream for TryChunks<St> {
45 #[allow(clippy::type_complexity)]
46 type Item = Result<Vec<St::Ok>, TryChunksError<St::Ok, St::Error>>;
47
48 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
49 let mut this = self.as_mut().project();
50 loop {
51 match ready!(this.stream.as_mut().try_poll_next(cx)) {
52 Some(item) => match item {
56 Ok(item) => {
57 this.items.push(item);
58 if this.items.len() >= *this.cap {
59 return Poll::Ready(Some(Ok(self.take())));
60 }
61 }
62 Err(e) => {
63 return Poll::Ready(Some(Err(TryChunksError(self.take(), e))));
64 }
65 },
66
67 None => {
70 let last = if this.items.is_empty() {
71 None
72 } else {
73 let full_buf = mem::take(this.items);
74 Some(full_buf)
75 };
76
77 return Poll::Ready(last.map(Ok));
78 }
79 }
80 }
81 }
82
83 fn size_hint(&self) -> (usize, Option<usize>) {
84 let chunk_len = if self.items.is_empty() { 0 } else { 1 };
85 let (lower, upper) = self.stream.size_hint();
86 let lower = (lower / self.cap).saturating_add(chunk_len);
87 let upper = match upper {
88 Some(x) => x.checked_add(chunk_len),
89 None => None,
90 };
91 (lower, upper)
92 }
93}
94
95impl<St: TryStream + FusedStream> FusedStream for TryChunks<St> {
96 fn is_terminated(&self) -> bool {
97 self.stream.is_terminated() && self.items.is_empty()
98 }
99}
100
101#[cfg(feature = "sink")]
103impl<S, Item> Sink<Item> for TryChunks<S>
104where
105 S: TryStream + Sink<Item>,
106{
107 type Error = <S as Sink<Item>>::Error;
108
109 delegate_sink!(stream, Item);
110}
111
112#[derive(PartialEq, Eq)]
116pub struct TryChunksError<T, E>(pub Vec<T>, pub E);
117
118impl<T, E: fmt::Debug> fmt::Debug for TryChunksError<T, E> {
119 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
120 self.1.fmt(f)
121 }
122}
123
124impl<T, E: fmt::Display> fmt::Display for TryChunksError<T, E> {
125 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126 self.1.fmt(f)
127 }
128}
129
130#[cfg(feature = "std")]
131impl<T, E: fmt::Debug + fmt::Display> std::error::Error for TryChunksError<T, E> {}