buf_list/
imp.rs

1// Copyright (c) 2018 the linkerd2-proxy authors
2// Copyright (c) The buf-list Contributors
3// SPDX-License-Identifier: Apache-2.0
4
5use bytes::{Buf, BufMut, Bytes, BytesMut};
6use std::{
7    collections::VecDeque,
8    io::IoSlice,
9    iter::{FromIterator, FusedIterator},
10    sync::OnceLock,
11};
12
13/// Data composed of a list of [`Bytes`] chunks.
14///
15/// For more, see the [crate documentation](crate).
16#[derive(Clone, Debug, Default)]
17pub struct BufList {
18    // Invariant: none of the bufs in this queue are zero-length.
19    bufs: VecDeque<Bytes>,
20
21    /// An index of chunks and their start positions. There's an additional index at the end, which
22    /// is the length of the list (list.num_bytes()).
23    start_pos: OnceLock<Box<[u64]>>,
24}
25
26impl BufList {
27    /// Creates a new, empty, `BufList`.
28    #[inline]
29    pub fn new() -> Self {
30        Self::default()
31    }
32
33    #[inline]
34    pub(crate) fn get_start_pos(&self) -> &[u64] {
35        self.start_pos.get_or_init(|| {
36            let mut start_pos = Vec::with_capacity(self.bufs.len() + 1);
37            let mut next = 0u64;
38            for chunk in self.bufs.iter() {
39                start_pos.push(next);
40                next += chunk.len() as u64;
41            }
42            // Add the length of the chunk at the end.
43            start_pos.push(next);
44            start_pos.into_boxed_slice()
45        })
46    }
47
48    /// Creates a new, empty, `BufList` with the given capacity.
49    #[inline]
50    pub fn with_capacity(capacity: usize) -> Self {
51        Self {
52            bufs: VecDeque::with_capacity(capacity),
53            start_pos: OnceLock::new(),
54        }
55    }
56
57    /// Returns the total number of chunks in this `BufList`.
58    ///
59    /// # Examples
60    ///
61    /// ```
62    /// use buf_list::BufList;
63    ///
64    /// let buf_list = vec![&b"hello"[..], &b"world"[..]].into_iter().collect::<BufList>();
65    /// assert_eq!(buf_list.num_chunks(), 2);
66    /// ```
67    #[inline]
68    pub fn num_chunks(&self) -> usize {
69        self.bufs.len()
70    }
71
72    /// Returns the total number of bytes across all chunks.
73    ///
74    /// # Examples
75    ///
76    /// ```
77    /// use buf_list::BufList;
78    ///
79    /// let buf_list = vec![&b"hello"[..], &b"world"[..]].into_iter().collect::<BufList>();
80    /// assert_eq!(buf_list.num_bytes(), 10);
81    /// ```
82    #[inline]
83    pub fn num_bytes(&self) -> usize {
84        self.remaining()
85    }
86
87    /// Provides a reference to the chunk at the given index.
88    ///
89    /// # Examples
90    ///
91    /// ```
92    /// use buf_list::BufList;
93    /// use bytes::Bytes;
94    ///
95    /// let buf_list = vec![&b"hello"[..], &b"world"[..]].into_iter().collect::<BufList>();
96    /// assert_eq!(buf_list.get_chunk(1), Some(&Bytes::from(&b"world"[..])));
97    /// ```
98    #[inline]
99    pub fn get_chunk(&self, index: usize) -> Option<&Bytes> {
100        self.bufs.get(index)
101    }
102
103    /// Iterates over the chunks in this list.
104    #[inline]
105    pub fn iter(&self) -> Iter<'_> {
106        Iter {
107            iter: self.bufs.iter(),
108        }
109    }
110
111    /// Adds a new chunk to this list.
112    ///
113    /// If the provided [`Buf`] is zero-length, it will not be added to the list.
114    ///
115    /// # Examples
116    ///
117    /// ```
118    /// use buf_list::BufList;
119    /// use bytes::{Buf, Bytes};
120    ///
121    /// let mut buf_list = BufList::new();
122    ///
123    /// // &'static [u8] implements Buf.
124    /// buf_list.push_chunk(&b"hello"[..]);
125    /// assert_eq!(buf_list.chunk(), &b"hello"[..]);
126    ///
127    /// // Bytes also implements Buf.
128    /// buf_list.push_chunk(Bytes::from_static(&b"world"[..]));
129    /// assert_eq!(buf_list.num_chunks(), 2);
130    ///
131    /// // A zero-length `Buf` will not be added to the list.
132    /// buf_list.push_chunk(Bytes::new());
133    /// assert_eq!(buf_list.num_chunks(), 2);
134    /// ```
135    pub fn push_chunk<B: Buf>(&mut self, mut data: B) -> Bytes {
136        // mutable borrow acquired, invalidate the OnceLock
137        self.start_pos = OnceLock::new();
138
139        let len = data.remaining();
140        // `data` is (almost) certainly a `Bytes`, so `copy_to_bytes` should
141        // internally be a cheap refcount bump almost all of the time.
142        // But, if it isn't, this will copy it to a `Bytes` that we can
143        // now clone.
144        let bytes = data.copy_to_bytes(len);
145
146        // Buffer a clone. Don't push zero-length bufs to uphold the invariant.
147        if len > 0 {
148            self.bufs.push_back(bytes.clone());
149        }
150
151        // Return the bytes
152        bytes
153    }
154}
155
156impl<B: Buf> Extend<B> for BufList {
157    fn extend<T: IntoIterator<Item = B>>(&mut self, iter: T) {
158        // mutable borrow acquired, invalidate the OnceLock
159        self.start_pos = OnceLock::new();
160
161        for buf in iter.into_iter() {
162            self.push_chunk(buf);
163        }
164    }
165}
166
167impl<B: Buf> FromIterator<B> for BufList {
168    fn from_iter<T: IntoIterator<Item = B>>(iter: T) -> Self {
169        let mut buf_list = BufList::new();
170        for buf in iter.into_iter() {
171            buf_list.push_chunk(buf);
172        }
173        buf_list
174    }
175}
176
177impl IntoIterator for BufList {
178    type Item = Bytes;
179    type IntoIter = IntoIter;
180
181    #[inline]
182    fn into_iter(self) -> Self::IntoIter {
183        IntoIter {
184            iter: self.bufs.into_iter(),
185        }
186    }
187}
188
189impl<'a> IntoIterator for &'a BufList {
190    type Item = &'a Bytes;
191    type IntoIter = Iter<'a>;
192
193    #[inline]
194    fn into_iter(self) -> Self::IntoIter {
195        self.iter()
196    }
197}
198
199impl AsRef<BufList> for BufList {
200    fn as_ref(&self) -> &BufList {
201        self
202    }
203}
204
205impl Buf for BufList {
206    fn remaining(&self) -> usize {
207        self.bufs.iter().map(Buf::remaining).sum()
208    }
209
210    fn chunk(&self) -> &[u8] {
211        self.bufs.front().map(Buf::chunk).unwrap_or(&[])
212    }
213
214    fn chunks_vectored<'iovs>(&'iovs self, iovs: &mut [IoSlice<'iovs>]) -> usize {
215        // Are there more than zero iovecs to write to?
216        if iovs.is_empty() {
217            return 0;
218        }
219
220        let to_fill = (iovs.len()).min(self.bufs.len());
221        for (i, iov) in iovs.iter_mut().enumerate().take(to_fill) {
222            *iov = IoSlice::new(&self.bufs[i]);
223        }
224
225        to_fill
226    }
227
228    fn advance(&mut self, mut amt: usize) {
229        // mutable borrow acquired, invalidate the OnceLock
230        self.start_pos = OnceLock::new();
231
232        while amt > 0 {
233            let rem = self.bufs[0].remaining();
234            // If the amount to advance by is less than the first buffer in
235            // the buffer list, advance that buffer's cursor by `amt`,
236            // and we're done.
237            if rem > amt {
238                self.bufs[0].advance(amt);
239                return;
240            }
241
242            // Otherwise, advance the first buffer to its end, and
243            // continue.
244            self.bufs[0].advance(rem);
245            amt -= rem;
246
247            self.bufs.pop_front();
248        }
249    }
250
251    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
252        // mutable borrow acquired, invalidate the OnceLock
253        self.start_pos = OnceLock::new();
254
255        // If the length of the requested `Bytes` is <= the length of the front
256        // buffer, we can just use its `copy_to_bytes` implementation (which is
257        // just a reference count bump).
258        match self.bufs.front_mut() {
259            Some(first) if len <= first.remaining() => {
260                let buf = first.copy_to_bytes(len);
261                // If we consumed the first buffer, also advance our "cursor" by
262                // popping it.
263                if first.remaining() == 0 {
264                    self.bufs.pop_front();
265                }
266
267                buf
268            }
269            _ => {
270                assert!(
271                    len <= self.remaining(),
272                    "`len` ({}) greater than remaining ({})",
273                    len,
274                    self.remaining()
275                );
276                let mut buf = BytesMut::with_capacity(len);
277                buf.put(self.take(len));
278                buf.freeze()
279            }
280        }
281    }
282}
283
284impl<T: Into<Bytes>> From<T> for BufList {
285    fn from(value: T) -> Self {
286        let mut buf_list = BufList::with_capacity(1);
287        buf_list.push_chunk(value.into());
288        buf_list
289    }
290}
291
292/// An owned iterator over chunks in a [`BufList`].
293///
294/// Returned by the [`IntoIterator`] implementation for [`BufList`].
295#[derive(Clone, Debug)]
296pub struct IntoIter {
297    iter: std::collections::vec_deque::IntoIter<Bytes>,
298}
299
300impl Iterator for IntoIter {
301    type Item = Bytes;
302
303    #[inline]
304    fn next(&mut self) -> Option<Self::Item> {
305        self.iter.next()
306    }
307
308    #[inline]
309    fn size_hint(&self) -> (usize, Option<usize>) {
310        self.iter.size_hint()
311    }
312}
313
314impl DoubleEndedIterator for IntoIter {
315    #[inline]
316    fn next_back(&mut self) -> Option<Self::Item> {
317        self.iter.next_back()
318    }
319}
320
321impl ExactSizeIterator for IntoIter {
322    #[inline]
323    fn len(&self) -> usize {
324        self.iter.len()
325    }
326}
327
328impl FusedIterator for IntoIter {}
329
330/// A borrowed iterator over chunks in a [`BufList`].
331///
332/// Returned by [`BufList::iter`], and by the [`IntoIterator`] implementation for `&'a BufList`.
333#[derive(Clone, Debug)]
334pub struct Iter<'a> {
335    iter: std::collections::vec_deque::Iter<'a, Bytes>,
336}
337
338impl<'a> Iterator for Iter<'a> {
339    type Item = &'a Bytes;
340
341    #[inline]
342    fn next(&mut self) -> Option<Self::Item> {
343        self.iter.next()
344    }
345
346    // These methods are implemented manually to forward to the underlying
347    // iterator.
348
349    #[inline]
350    fn size_hint(&self) -> (usize, Option<usize>) {
351        self.iter.size_hint()
352    }
353
354    // fold has a special implementation, so forward it.
355    #[inline]
356    fn fold<B, F>(self, init: B, f: F) -> B
357    where
358        Self: Sized,
359        F: FnMut(B, Self::Item) -> B,
360    {
361        self.iter.fold(init, f)
362    }
363
364    // Can't implement try_fold as it uses `std::ops::Try` which isn't stable yet, as of Rust 1.67
365
366    #[inline]
367    fn nth(&mut self, n: usize) -> Option<Self::Item> {
368        self.iter.nth(n)
369    }
370
371    #[inline]
372    fn last(self) -> Option<Self::Item>
373    where
374        Self: Sized,
375    {
376        self.iter.last()
377    }
378}
379
380impl<'a> DoubleEndedIterator for Iter<'a> {
381    #[inline]
382    fn next_back(&mut self) -> Option<Self::Item> {
383        self.iter.next_back()
384    }
385
386    #[inline]
387    fn rfold<B, F>(self, init: B, f: F) -> B
388    where
389        Self: Sized,
390        F: FnMut(B, Self::Item) -> B,
391    {
392        self.iter.rfold(init, f)
393    }
394
395    // Can't implement try_rfold as it uses `std::ops::Try` which isn't stable yet, as of Rust 1.67.
396}
397
398impl<'a> ExactSizeIterator for Iter<'a> {
399    #[inline]
400    fn len(&self) -> usize {
401        self.iter.len()
402    }
403}
404
405impl<'a> FusedIterator for Iter<'a> {}