buf_list/cursor/
tokio_imp.rs

1// Copyright (c) The buf-list Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use super::CursorData;
5use crate::{BufList, Cursor};
6use std::{
7    io::{self, SeekFrom},
8    pin::Pin,
9    task::{Context, Poll},
10};
11use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, ReadBuf};
12
13impl<T: AsRef<BufList> + Unpin> AsyncSeek for Cursor<T> {
14    fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> {
15        io::Seek::seek(&mut *self, pos).map(drop)
16    }
17
18    fn poll_complete(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<u64>> {
19        Poll::Ready(Ok(self.get_mut().position()))
20    }
21}
22
23impl<T: AsRef<BufList> + Unpin> AsyncRead for Cursor<T> {
24    fn poll_read(
25        mut self: Pin<&mut Self>,
26        _: &mut Context<'_>,
27        buf: &mut ReadBuf<'_>,
28    ) -> Poll<io::Result<()>> {
29        let this = &mut *self;
30        this.data.tokio_poll_read_impl(this.inner.as_ref(), buf)
31    }
32}
33
34impl<T: AsRef<BufList> + Unpin> AsyncBufRead for Cursor<T> {
35    fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
36        Poll::Ready(io::BufRead::fill_buf(self.get_mut()))
37    }
38
39    fn consume(mut self: Pin<&mut Self>, amt: usize) {
40        io::BufRead::consume(&mut *self, amt)
41    }
42}
43
44impl CursorData {
45    fn tokio_poll_read_impl(
46        &mut self,
47        list: &BufList,
48        buf: &mut ReadBuf<'_>,
49    ) -> Poll<io::Result<()>> {
50        // This is really similar to Self::read_impl, except it's written against the ReadBuf API.
51        while buf.remaining() > 0 {
52            let (chunk, chunk_pos) = match self.get_chunk_and_pos(list) {
53                Some(value) => value,
54                None => break,
55            };
56            // The number of bytes to copy is the smaller of the two:
57            // - the length of the chunk - the position in it.
58            // - the number of bytes remaining.
59            let n_to_copy = (chunk.len() - chunk_pos).min(buf.remaining());
60            let chunk_bytes = chunk.as_ref();
61
62            let bytes_to_copy = &chunk_bytes[chunk_pos..(chunk_pos + n_to_copy)];
63            buf.put_slice(bytes_to_copy);
64
65            // Increment the position.
66            self.pos += n_to_copy as u64;
67            // If we've finished reading through the chunk, move to the next chunk.
68            if n_to_copy == chunk.len() - chunk_pos {
69                self.chunk += 1;
70            }
71        }
72
73        Poll::Ready(Ok(()))
74    }
75}